diff mbox series

[v5,2/3] block: io-uring: add READV_PI/WRITEV_PI operations

Message ID 20220920144618.1111138-3-a.buev@yadro.com (mailing list archive)
State New
Headers show
Series implement direct IO with integrity | expand

Commit Message

Alexander V. Buev Sept. 20, 2022, 2:46 p.m. UTC
Added new READV_PI/WRITEV_PI operations to io_uring.
Added new pi_addr & pi_len fields to SQE struct.
Added new IOCB_USE_PI flag to kiocb struct.
Use kiocb->private pointer to pass PI data
iterator to low layer.

Signed-off-by: Alexander V. Buev <a.buev@yadro.com>
---
 include/linux/fs.h            |   1 +
 include/uapi/linux/io_uring.h |   6 +
 include/uapi/linux/uio.h      |   3 +-
 io_uring/Makefile             |   3 +-
 io_uring/io_uring.c           |   2 +
 io_uring/opdef.c              |  27 ++
 io_uring/rw.h                 |   4 +
 io_uring/rw_pi.c              | 630 ++++++++++++++++++++++++++++++++++
 io_uring/rw_pi.h              |  34 ++
 9 files changed, 708 insertions(+), 2 deletions(-)
 create mode 100644 io_uring/rw_pi.c
 create mode 100644 io_uring/rw_pi.h

Comments

Jens Axboe Sept. 21, 2022, 5:59 p.m. UTC | #1
On 9/20/22 8:46 AM, Alexander V. Buev wrote:
> Added new READV_PI/WRITEV_PI operations to io_uring.
> Added new pi_addr & pi_len fields to SQE struct.
> Added new IOCB_USE_PI flag to kiocb struct.
> Use kiocb->private pointer to pass PI data
> iterator to low layer.

Minor nit - please format commit message lines to 72-74 chars.

In general, I think this feature is useful. I do echo Keith's response
that it should probably be named a bit differently, as PI is just one
use case of this.

But for this patch in particular, not a huge fan of the rote copying of
rw.c into a new file. Now we have to patch two different spots whenever
a bug is found in there, that's not very maintainable. I do appreciate
the fact that this keeps the PI work out of the fast path for
read/write, but I do think this warrants a bit of refactoring work first
to ensure that there are helpers that can be shared between rw and
rw_pi. That definitely needs to be solved before this can be considered
for inclusion.
Alexander V. Buev Sept. 22, 2022, 12:48 p.m. UTC | #2
> In general, I think this feature is useful. I do echo Keith's response
> that it should probably be named a bit differently, as PI is just one
> use case of this.
Accepted. 
In the next version, this suffix "pi" will be renamed to "meta"
(meta_addr, meta_len, READV_META, WRITEV_META and etc...)


> But for this patch in particular, not a huge fan of the rote copying of
> rw.c into a new file. Now we have to patch two different spots whenever
> a bug is found in there, that's not very maintainable. I do appreciate
> the fact that this keeps the PI work out of the fast path for
> read/write, but I do think this warrants a bit of refactoring work first
> to ensure that there are helpers that can be shared between rw and
> rw_pi. That definitely needs to be solved before this can be considered
> for inclusion.
I think it would be better to move some of the shared code to another file. 
For example "rw_common.[ch]". What do you think about?
As an alternative I can leave such code in "rw.[ch]" file as is.
Jens Axboe Sept. 22, 2022, 2:08 p.m. UTC | #3
>> But for this patch in particular, not a huge fan of the rote copying of
>> rw.c into a new file. Now we have to patch two different spots whenever
>> a bug is found in there, that's not very maintainable. I do appreciate
>> the fact that this keeps the PI work out of the fast path for
>> read/write, but I do think this warrants a bit of refactoring work first
>> to ensure that there are helpers that can be shared between rw and
>> rw_pi. That definitely needs to be solved before this can be considered
>> for inclusion.
> I think it would be better to move some of the shared code to another file. 
> For example "rw_common.[ch]". What do you think about?
> As an alternative I can leave such code in "rw.[ch]" file as is.

That's basically what I'm suggesting, at least that would be one way to
do it. And the best one imho. So yes, let's go ahead and do that.
diff mbox series

Patch

diff --git a/include/linux/fs.h b/include/linux/fs.h
index 9eced4cc286e..a28b12a22750 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -337,6 +337,7 @@  enum rw_hint {
 #define IOCB_NOIO		(1 << 20)
 /* can use bio alloc cache */
 #define IOCB_ALLOC_CACHE	(1 << 21)
+#define IOCB_USE_PI		(1 << 22)
 
 struct kiocb {
 	struct file		*ki_filp;
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 6b83177fd41d..a4158e48cecb 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -80,6 +80,10 @@  struct io_uring_sqe {
 			__u64	addr3;
 			__u64	__pad2[1];
 		};
+		struct {
+			__u64	pi_addr;
+			__u32	pi_len;
+		};
 		/*
 		 * If the ring is initialized with IORING_SETUP_SQE128, then
 		 * this field is used for 80 bytes of arbitrary command data
@@ -206,6 +210,8 @@  enum io_uring_op {
 	IORING_OP_SOCKET,
 	IORING_OP_URING_CMD,
 	IORING_OP_SEND_ZC,
+	IORING_OP_READV_PI,
+	IORING_OP_WRITEV_PI,
 
 	/* this goes last, obviously */
 	IORING_OP_LAST,
diff --git a/include/uapi/linux/uio.h b/include/uapi/linux/uio.h
index 059b1a9147f4..c9eaaa6cdb0f 100644
--- a/include/uapi/linux/uio.h
+++ b/include/uapi/linux/uio.h
@@ -23,9 +23,10 @@  struct iovec
 /*
  *	UIO_MAXIOV shall be at least 16 1003.1g (5.4.1.1)
  */
- 
+
 #define UIO_FASTIOV	8
 #define UIO_MAXIOV	1024
+#define UIO_FASTIOV_PI	1
 
 
 #endif /* _UAPI__LINUX_UIO_H */
diff --git a/io_uring/Makefile b/io_uring/Makefile
index 8cc8e5387a75..8c01546c2bcf 100644
--- a/io_uring/Makefile
+++ b/io_uring/Makefile
@@ -7,5 +7,6 @@  obj-$(CONFIG_IO_URING)		+= io_uring.o xattr.o nop.o fs.o splice.o \
 					openclose.o uring_cmd.o epoll.o \
 					statx.o net.o msg_ring.o timeout.o \
 					sqpoll.o fdinfo.o tctx.o poll.o \
-					cancel.o kbuf.o rsrc.o rw.o opdef.o notif.o
+					cancel.o kbuf.o rsrc.o rw.o opdef.o \
+					notif.o rw_pi.o
 obj-$(CONFIG_IO_WQ)		+= io-wq.o
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index b9640ad5069f..b2f451e18646 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -3927,7 +3927,9 @@  static int __init io_uring_init(void)
 	BUILD_BUG_SQE_ELEM(44, __u16,  addr_len);
 	BUILD_BUG_SQE_ELEM(46, __u16,  __pad3[0]);
 	BUILD_BUG_SQE_ELEM(48, __u64,  addr3);
+	BUILD_BUG_SQE_ELEM(48, __u64,  pi_addr);
 	BUILD_BUG_SQE_ELEM_SIZE(48, 0, cmd);
+	BUILD_BUG_SQE_ELEM(56, __u32,  pi_len);
 	BUILD_BUG_SQE_ELEM(56, __u64,  __pad2);
 
 	BUILD_BUG_ON(sizeof(struct io_uring_files_update) !=
diff --git a/io_uring/opdef.c b/io_uring/opdef.c
index c61494e0a602..da2b12a44995 100644
--- a/io_uring/opdef.c
+++ b/io_uring/opdef.c
@@ -33,6 +33,7 @@ 
 #include "poll.h"
 #include "cancel.h"
 #include "rw.h"
+#include "rw_pi.h"
 
 static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags)
 {
@@ -488,6 +489,32 @@  const struct io_op_def io_op_defs[] = {
 		.prep			= io_eopnotsupp_prep,
 #endif
 	},
+	[IORING_OP_READV_PI] = {
+		.needs_file		= 1,
+		.plug			= 1,
+		.audit_skip		= 1,
+		.ioprio			= 1,
+		.iopoll			= 1,
+		.async_size		= sizeof(struct io_async_rw_pi),
+		.name			= "READV_PI",
+		.prep			= io_prep_rw_pi,
+		.issue			= io_readv_pi,
+		.prep_async		= io_readv_pi_prep_async,
+		.cleanup		= io_readv_writev_cleanup,
+	},
+	[IORING_OP_WRITEV_PI] = {
+		.needs_file		= 1,
+		.plug			= 1,
+		.audit_skip		= 1,
+		.ioprio			= 1,
+		.iopoll			= 1,
+		.async_size		= sizeof(struct io_async_rw_pi),
+		.name			= "WRITEV_PI",
+		.prep			= io_prep_rw_pi,
+		.issue			= io_writev_pi,
+		.prep_async		= io_writev_pi_prep_async,
+		.cleanup		= io_readv_writev_cleanup,
+	},
 };
 
 const char *io_uring_get_opcode(u8 opcode)
diff --git a/io_uring/rw.h b/io_uring/rw.h
index 0204c3fcafa5..c00ece398540 100644
--- a/io_uring/rw.h
+++ b/io_uring/rw.h
@@ -1,4 +1,6 @@ 
 // SPDX-License-Identifier: GPL-2.0
+#ifndef IOU_RW_H
+#define IOU_RW_H
 
 #include <linux/pagemap.h>
 
@@ -21,3 +23,5 @@  int io_readv_prep_async(struct io_kiocb *req);
 int io_write(struct io_kiocb *req, unsigned int issue_flags);
 int io_writev_prep_async(struct io_kiocb *req);
 void io_readv_writev_cleanup(struct io_kiocb *req);
+
+#endif
diff --git a/io_uring/rw_pi.c b/io_uring/rw_pi.c
new file mode 100644
index 000000000000..ecee133d709c
--- /dev/null
+++ b/io_uring/rw_pi.c
@@ -0,0 +1,630 @@ 
+// SPDX-License-Identifier: GPL-2.0
+#include <linux/kernel.h>
+#include <linux/errno.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/blk-mq.h>
+#include <linux/mm.h>
+#include <linux/slab.h>
+#include <linux/fsnotify.h>
+#include <linux/poll.h>
+#include <linux/nospec.h>
+#include <linux/compat.h>
+#include <linux/io_uring.h>
+
+#include <uapi/linux/io_uring.h>
+
+#include "io_uring.h"
+#include "opdef.h"
+#include "rw_pi.h"
+
+#define io_kiocb_to_kiocb(req, type) \
+				(&((type *)io_kiocb_to_cmd(req, type))->kiocb)
+#define DATA	(0)
+#define PI	(1)
+
+#define u64_to_ptr(x)	(		\
+{					\
+	typecheck(u64, (x));		\
+	(void *)(uintptr_t)(x);		\
+})
+
+struct io_rw_pi {
+	struct kiocb			kiocb;
+	u64				addr;
+	u32				nr_segs;
+	u32				nr_pi_segs;
+};
+
+static inline
+void io_rw_pi_state_iter_restore(struct io_rw_state *data, struct __io_rw_pi_state *pi)
+{
+	iov_iter_restore(&data->iter, &data->iter_state);
+	iov_iter_restore(&pi->iter, &pi->iter_state);
+}
+
+static inline
+void io_rw_pi_state_iter_save(struct io_rw_state *data, struct __io_rw_pi_state *pi)
+{
+	iov_iter_save_state(&data->iter, &data->iter_state);
+	iov_iter_save_state(&pi->iter, &pi->iter_state);
+}
+
+static inline bool io_file_supports_nowait(struct io_kiocb *req)
+{
+	return req->flags & REQ_F_SUPPORT_NOWAIT;
+}
+
+static inline void io_rw_done(struct kiocb *kiocb, ssize_t ret)
+{
+	switch (ret) {
+	case -EIOCBQUEUED:
+		break;
+	case -ERESTARTSYS:
+	case -ERESTARTNOINTR:
+	case -ERESTARTNOHAND:
+	case -ERESTART_RESTARTBLOCK:
+		/*
+		 * We can't just restart the syscall, since previously
+		 * submitted sqes may already be in progress. Just fail this
+		 * IO with EINTR.
+		 */
+		ret = -EINTR;
+		fallthrough;
+	default:
+		kiocb->ki_complete(kiocb, ret);
+	}
+}
+
+static inline loff_t *io_kiocb_update_pos(struct io_kiocb *req)
+{
+	struct io_rw_pi *rw = io_kiocb_to_cmd(req, struct io_rw_pi);
+
+	if (rw->kiocb.ki_pos != -1)
+		return &rw->kiocb.ki_pos;
+
+	if (!(req->file->f_mode & FMODE_STREAM)) {
+		req->flags |= REQ_F_CUR_POS;
+		rw->kiocb.ki_pos = req->file->f_pos;
+		return &rw->kiocb.ki_pos;
+	}
+
+	rw->kiocb.ki_pos = 0;
+	return NULL;
+}
+
+static void io_req_task_queue_reissue(struct io_kiocb *req)
+{
+	req->io_task_work.func = io_queue_iowq;
+	io_req_task_work_add(req);
+}
+
+static bool io_resubmit_prep(struct io_kiocb *req)
+{
+	struct io_async_rw_pi *arw = req->async_data;
+
+	if (!req_has_async_data(req))
+		return !io_req_prep_async(req);
+	io_rw_pi_state_iter_restore(&arw->data.s, &arw->pi.s);
+	return true;
+}
+
+static bool io_rw_should_reissue(struct io_kiocb *req)
+{
+	struct io_ring_ctx *ctx = req->ctx;
+
+	if ((req->flags & REQ_F_NOWAIT) || (io_wq_current_is_worker() &&
+	    !(ctx->flags & IORING_SETUP_IOPOLL)))
+		return false;
+	/*
+	 * If ref is dying, we might be running poll reap from the exit work.
+	 * Don't attempt to reissue from that path, just let it fail with
+	 * -EAGAIN.
+	 */
+	if (percpu_ref_is_dying(&ctx->refs))
+		return false;
+	/*
+	 * Play it safe and assume not safe to re-import and reissue if we're
+	 * not in the original thread group (or in task context).
+	 */
+	if (!same_thread_group(req->task, current) || !in_task())
+		return false;
+	return true;
+}
+
+static bool __io_complete_rw_common(struct io_kiocb *req, long res)
+{
+	struct io_rw_pi *rw = io_kiocb_to_cmd(req, struct io_rw_pi);
+
+	if (rw->kiocb.ki_flags & IOCB_WRITE)
+		fsnotify_modify(req->file);
+	else
+		fsnotify_access(req->file);
+
+	if (unlikely(res != req->cqe.res)) {
+		if ((res == -EAGAIN || res == -EOPNOTSUPP) &&
+		    io_rw_should_reissue(req)) {
+			req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
+			return true;
+		}
+		req_set_fail(req);
+		req->cqe.res = res;
+	}
+	return false;
+}
+
+static inline
+unsigned int io_fixup_rw_res(struct io_kiocb *req, unsigned int res)
+{
+	struct io_async_rw_pi *arw = req->async_data;
+
+	/* add previously done IO, if any */
+	if (req_has_async_data(req) && arw->data.bytes_done > 0) {
+		if (res < 0)
+			res = arw->data.bytes_done;
+		else
+			res += arw->data.bytes_done;
+	}
+	return res;
+}
+
+static void io_complete_rw(struct kiocb *kiocb, long res)
+{
+	struct io_rw_pi *rw = container_of(kiocb, struct io_rw_pi, kiocb);
+	struct io_kiocb *req = cmd_to_io_kiocb(rw);
+
+	if (__io_complete_rw_common(req, res))
+		return;
+	io_req_set_res(req, io_fixup_rw_res(req, res), 0);
+	req->io_task_work.func = io_req_task_complete;
+	io_req_task_work_add(req);
+}
+
+static void io_complete_rw_iopoll(struct kiocb *kiocb, long res)
+{
+	struct io_rw_pi *rw = container_of(kiocb, struct io_rw_pi, kiocb);
+	struct io_kiocb *req = cmd_to_io_kiocb(rw);
+
+	if (unlikely(res != req->cqe.res)) {
+		if (res == -EAGAIN && io_rw_should_reissue(req)) {
+			req->flags |= REQ_F_REISSUE | REQ_F_PARTIAL_IO;
+			return;
+		}
+		req->cqe.res = res;
+	}
+
+	/* order with io_iopoll_complete() checking ->iopoll_completed */
+	smp_store_release(&req->iopoll_completed, 1);
+}
+
+static int kiocb_done(struct io_kiocb *req, ssize_t ret,
+		       unsigned int issue_flags)
+{
+	struct io_rw_pi *rw = io_kiocb_to_cmd(req, struct io_rw_pi);
+	unsigned int final_ret = io_fixup_rw_res(req, ret);
+
+	if (req->flags & REQ_F_CUR_POS)
+		req->file->f_pos = rw->kiocb.ki_pos;
+	if (ret >= 0 && (rw->kiocb.ki_complete == io_complete_rw)) {
+		if (!__io_complete_rw_common(req, ret)) {
+			io_req_set_res(req, final_ret, 0);
+			return IOU_OK;
+		}
+	} else {
+		io_rw_done(&rw->kiocb, ret);
+	}
+
+	if (req->flags & REQ_F_REISSUE) {
+		req->flags &= ~REQ_F_REISSUE;
+		if (io_resubmit_prep(req))
+			io_req_task_queue_reissue(req);
+		else
+			io_req_task_queue_fail(req, ret);
+	}
+	return IOU_ISSUE_SKIP_COMPLETE;
+}
+
+int io_prep_rw_pi(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+	struct io_rw_pi *rw = io_kiocb_to_cmd(req, struct io_rw_pi);
+	struct kiocb *kiocb = &rw->kiocb;
+	unsigned int ioprio;
+	int ret;
+
+	kiocb->ki_flags = 0;
+	ret = kiocb_set_rw_flags(kiocb, READ_ONCE(sqe->rw_flags));
+	if (unlikely(ret))
+		return ret;
+
+	kiocb->ki_pos = READ_ONCE(sqe->off);
+
+	ioprio = READ_ONCE(sqe->ioprio);
+	if (ioprio) {
+		ret = ioprio_check_cap(ioprio);
+		if (ret)
+			return ret;
+
+		kiocb->ki_ioprio = ioprio;
+	} else {
+		kiocb->ki_ioprio = get_current_ioprio();
+	}
+
+	req->imu = NULL;
+
+	/* save data iovec pointer & len */
+	rw->addr = (uintptr_t)READ_ONCE(sqe->addr);
+	rw->nr_segs = READ_ONCE(sqe->len);
+
+	/* save pi iovec pointer & len */
+	rw->kiocb.private = u64_to_ptr(READ_ONCE(sqe->pi_addr));
+	rw->nr_pi_segs = READ_ONCE(sqe->pi_len);
+
+	kiocb->ki_flags |= IOCB_USE_PI;
+
+	return 0;
+}
+
+
+static inline int
+io_import_iovecs_pi(int io_dir, struct io_kiocb *req, struct iovec **iovec,
+			struct io_rw_state *s_data, struct __io_rw_pi_state *s_pi)
+{
+	struct io_rw_pi *rw = io_kiocb_to_cmd(req, struct io_rw_pi);
+	struct iovec __user *uvec;
+	ssize_t ret;
+
+	/* data */
+	uvec = (struct iovec __user *)u64_to_user_ptr(rw->addr);
+	iovec[DATA] = s_data->fast_iov;
+	ret = __import_iovec(io_dir, uvec, rw->nr_segs,
+				UIO_FASTIOV, iovec + DATA,
+				&s_data->iter, req->ctx->compat);
+
+	if (unlikely(ret <= 0))
+		return (ret) ? ret : -EINVAL;
+	/* pi */
+	uvec = (struct iovec __user *)rw->kiocb.private;
+	iovec[PI] = s_pi->fast_iov;
+	ret = __import_iovec(io_dir, uvec, rw->nr_pi_segs,
+				UIO_FASTIOV_PI, iovec + PI,
+				&s_pi->iter, req->ctx->compat);
+	if (unlikely(ret <= 0)) {
+		if (iovec[DATA])
+			kfree(iovec[DATA]);
+		return (ret) ? ret : -EINVAL;
+	}
+
+	/* save states */
+	io_rw_pi_state_iter_save(s_data, s_pi);
+
+	return 0;
+}
+
+static inline void
+io_setup_async_state(struct io_rw_state *async_s, const struct io_rw_state *s)
+{
+	unsigned int iov_off = 0;
+
+	async_s->iter.iov = async_s->fast_iov;
+	if (s->iter.iov != s->fast_iov) {
+		iov_off = s->iter.iov - s->fast_iov;
+		async_s->iter.iov += iov_off;
+	}
+	if (async_s->fast_iov != s->fast_iov) {
+		memcpy(async_s->fast_iov + iov_off, s->fast_iov + iov_off,
+			       sizeof(struct iovec) * s->iter.nr_segs);
+	}
+}
+
+static int
+io_setup_async_rw_pi(struct io_kiocb *req, struct iovec * const *iovec,
+			struct io_rw_state *s_data,
+			struct __io_rw_pi_state *s_pi)
+{
+	struct io_async_rw_pi *arw;
+
+	if (req_has_async_data(req))
+		return 0;
+
+	if (io_alloc_async_data(req))
+		return -ENOMEM;
+
+	arw = req->async_data;
+
+	/* data */
+	arw->data.s.iter = s_data->iter;
+	arw->data.free_iovec = iovec[DATA];
+	arw->data.bytes_done = 0;
+
+	if (iovec[DATA])
+		req->flags |= REQ_F_NEED_CLEANUP;
+	else
+		io_setup_async_state(&arw->data.s, s_data);
+
+	/* pi */
+	arw->pi.s.iter = s_pi->iter;
+	arw->pi.free_iovec = iovec[PI];
+
+	if (iovec[PI])
+		req->flags |= REQ_F_NEED_CLEANUP;
+	else {
+		io_setup_async_state((struct io_rw_state *)&arw->pi.s,
+					(const struct io_rw_state *)s_pi);
+	}
+
+	/* save states */
+	io_rw_pi_state_iter_save(&arw->data.s, &arw->pi.s);
+
+	return 0;
+}
+
+static inline int io_rw_pi_prep_async(struct io_kiocb *req, int io_dir)
+{
+	int ret = 0;
+	struct io_async_rw_pi *arw = req->async_data;
+	struct iovec *iovec[2];
+
+	ret = io_import_iovecs_pi(io_dir, req, iovec,
+					&arw->data.s, &arw->pi.s);
+	if (unlikely(ret < 0))
+		return ret;
+
+	arw->data.bytes_done = 0;
+	arw->data.free_iovec = iovec[DATA];
+	arw->pi.free_iovec = iovec[PI];
+
+	if (iovec[DATA] || iovec[PI])
+		req->flags |= REQ_F_NEED_CLEANUP;
+
+	return 0;
+}
+
+
+int io_readv_pi_prep_async(struct io_kiocb *req)
+{
+	return io_rw_pi_prep_async(req, READ);
+}
+
+int io_writev_pi_prep_async(struct io_kiocb *req)
+{
+	return io_rw_pi_prep_async(req, WRITE);
+}
+
+static int io_rw_pi_init_file(struct io_kiocb *req, fmode_t mode)
+{
+	struct kiocb *kiocb = io_kiocb_to_kiocb(req, struct io_rw_pi);
+	struct io_ring_ctx *ctx = req->ctx;
+	struct file *file = req->file;
+	int flags;
+
+	if (unlikely(!file || !(file->f_mode & mode)))
+		return -EBADF;
+
+	if (unlikely(!S_ISBLK(file_inode(req->file)->i_mode)))
+		return -EINVAL;
+
+	if (unlikely(!(file->f_flags & O_DIRECT)))
+		return -EINVAL;
+
+	if (!io_req_ffs_set(req))
+		req->flags |= io_file_get_flags(file) << REQ_F_SUPPORT_NOWAIT_BIT;
+
+	flags = kiocb->ki_flags;
+	kiocb->ki_flags = iocb_flags(file);
+	kiocb->ki_flags |= flags;
+
+	/*
+	 * If the file is marked O_NONBLOCK, still allow retry for it if it
+	 * supports async. Otherwise it's impossible to use O_NONBLOCK files
+	 * reliably. If not, or it IOCB_NOWAIT is set, don't retry.
+	 */
+	if ((kiocb->ki_flags & IOCB_NOWAIT) ||
+	    ((file->f_flags & O_NONBLOCK) && !io_file_supports_nowait(req)))
+		req->flags |= REQ_F_NOWAIT;
+
+	if (ctx->flags & IORING_SETUP_IOPOLL) {
+		if (!(kiocb->ki_flags & IOCB_DIRECT) || !file->f_op->iopoll)
+			return -EOPNOTSUPP;
+
+		kiocb->ki_flags |= IOCB_HIPRI | IOCB_ALLOC_CACHE;
+		kiocb->ki_complete = io_complete_rw_iopoll;
+		req->iopoll_completed = 0;
+	} else {
+		if (kiocb->ki_flags & IOCB_HIPRI)
+			return -EINVAL;
+		kiocb->ki_complete = io_complete_rw;
+	}
+
+	return 0;
+}
+
+void io_readv_writev_pi_cleanup(struct io_kiocb *req)
+{
+	struct io_async_rw_pi *arw = req->async_data;
+
+	kfree(arw->data.free_iovec);
+	kfree(arw->pi.free_iovec);
+}
+
+int io_readv_pi(struct io_kiocb *req, unsigned int issue_flags)
+{
+	struct io_rw_pi_state s;
+	struct io_rw_state *s_data;
+	struct __io_rw_pi_state *s_pi;
+	struct iovec *iovec[2];
+	struct kiocb *kiocb = io_kiocb_to_kiocb(req, struct io_rw_pi);
+	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
+	ssize_t ret;
+	loff_t *ppos;
+
+	if (!req_has_async_data(req)) {
+		s_data = &s.data;
+		s_pi = &s.pi;
+		ret = io_import_iovecs_pi(READ, req, iovec, s_data, s_pi);
+		if (unlikely(ret < 0))
+			return ret;
+	} else {
+		struct io_async_rw_pi *arw = req->async_data;
+
+		iovec[DATA] = iovec[PI] = NULL;
+		s_data = &arw->data.s;
+		s_pi = &arw->pi.s;
+		io_rw_pi_state_iter_restore(s_data, s_pi);
+	}
+	kiocb->private = &s_pi->iter;
+
+	ret = io_rw_pi_init_file(req, FMODE_READ);
+	if (unlikely(ret))
+		goto out_free;
+
+	req->cqe.res = iov_iter_count(&s_data->iter);
+	if (force_nonblock) {
+		/* If the file doesn't support async, just async punt */
+		if (unlikely(!io_file_supports_nowait(req))) {
+			ret = io_setup_async_rw_pi(req, iovec, s_data, s_pi);
+			return ret ?: -EAGAIN;
+		}
+		kiocb->ki_flags |= IOCB_NOWAIT;
+	} else {
+		/* Ensure we clear previously set non-block flag */
+		kiocb->ki_flags &= ~IOCB_NOWAIT;
+	}
+
+	ppos = io_kiocb_update_pos(req);
+
+	ret = rw_verify_area(READ, req->file, ppos, req->cqe.res);
+	if (unlikely(ret))
+		goto out_free;
+
+	if (likely(req->file->f_op->read_iter))
+		ret = call_read_iter(req->file, kiocb, &s_data->iter);
+	else
+		ret = -EINVAL;
+
+	if (ret == -EAGAIN || (req->flags & REQ_F_REISSUE)) {
+		req->flags &= ~REQ_F_REISSUE;
+
+		/* IOPOLL retry should happen for io-wq threads */
+		if (!force_nonblock && !(req->ctx->flags & IORING_SETUP_IOPOLL))
+			goto done;
+		/* no retry on NONBLOCK nor RWF_NOWAIT */
+		if (req->flags & REQ_F_NOWAIT)
+			goto done;
+		ret = 0;
+	} else if (ret == -EIOCBQUEUED) {
+		ret = IOU_ISSUE_SKIP_COMPLETE;
+		goto out_free;
+	}
+
+done:
+	/* it's faster to check here then delegate to kfree */
+	if (iovec[DATA])
+		kfree(iovec[DATA]);
+	if (iovec[PI])
+		kfree(iovec[PI]);
+	return kiocb_done(req, ret, issue_flags);
+out_free:
+	if (iovec[DATA])
+		kfree(iovec[DATA]);
+	if (iovec[PI])
+		kfree(iovec[PI]);
+	return ret;
+}
+
+int io_writev_pi(struct io_kiocb *req, unsigned int issue_flags)
+{
+	struct io_rw_pi_state s;
+	struct io_rw_state *s_data;
+	struct __io_rw_pi_state *s_pi;
+	struct iovec *iovec[2];
+	struct kiocb *kiocb = io_kiocb_to_kiocb(req, struct io_rw_pi);
+	bool force_nonblock = issue_flags & IO_URING_F_NONBLOCK;
+	ssize_t ret, ret2;
+	loff_t *ppos;
+
+	if (!req_has_async_data(req)) {
+		s_data = &s.data;
+		s_pi = &s.pi;
+		ret = io_import_iovecs_pi(WRITE, req, iovec, s_data, s_pi);
+		if (unlikely(ret < 0))
+			return ret;
+	} else {
+		struct io_async_rw_pi *arw = req->async_data;
+
+		iovec[DATA] = iovec[PI] = 0;
+		s_data = &arw->data.s;
+		s_pi = &arw->pi.s;
+		io_rw_pi_state_iter_restore(s_data, s_pi);
+	}
+	kiocb->private = &s_pi->iter;
+
+	ret = io_rw_pi_init_file(req, FMODE_WRITE);
+	if (unlikely(ret))
+		goto out_free;
+
+	req->cqe.res = iov_iter_count(&s_data->iter);
+
+	if (force_nonblock) {
+		/* If the file doesn't support async, just async punt */
+		if (unlikely(!io_file_supports_nowait(req)))
+			goto copy_iov;
+
+		kiocb->ki_flags |= IOCB_NOWAIT;
+	} else {
+		/* Ensure we clear previously set non-block flag */
+		kiocb->ki_flags &= ~IOCB_NOWAIT;
+	}
+
+	ppos = io_kiocb_update_pos(req);
+
+	ret = rw_verify_area(WRITE, req->file, ppos, req->cqe.res);
+	if (unlikely(ret))
+		goto out_free;
+
+	kiocb->ki_flags |= IOCB_WRITE;
+
+	if (likely(req->file->f_op->write_iter))
+		ret2 = call_write_iter(req->file, kiocb, &s_data->iter);
+	else
+		ret2 = -EINVAL;
+
+	if (req->flags & REQ_F_REISSUE) {
+		req->flags &= ~REQ_F_REISSUE;
+		ret2 = -EAGAIN;
+	}
+
+	/*
+	 * Raw bdev writes will return -EOPNOTSUPP for IOCB_NOWAIT. Just
+	 * retry them without IOCB_NOWAIT.
+	 */
+	if (ret2 == -EOPNOTSUPP && (kiocb->ki_flags & IOCB_NOWAIT))
+		ret2 = -EAGAIN;
+	/* no retry on NONBLOCK nor RWF_NOWAIT */
+	if (ret2 == -EAGAIN && (req->flags & REQ_F_NOWAIT))
+		goto done;
+	if (!force_nonblock || ret2 != -EAGAIN) {
+		if (ret2 == -EIOCBQUEUED) {
+			ret = IOU_ISSUE_SKIP_COMPLETE;
+			goto out_free;
+		}
+		/* IOPOLL retry should happen for io-wq threads */
+		if (ret2 == -EAGAIN && (req->ctx->flags & IORING_SETUP_IOPOLL))
+			goto copy_iov;
+
+done:
+		ret = kiocb_done(req, ret2, issue_flags);
+	} else {
+copy_iov:
+		io_rw_pi_state_iter_restore(s_data, s_pi);
+		ret = io_setup_async_rw_pi(req, iovec, s_data, s_pi);
+		return ret ?: -EAGAIN;
+	}
+out_free:
+	/* it's reportedly faster than delegating the null check to kfree() */
+	if (iovec[DATA])
+		kfree(iovec[DATA]);
+	if (iovec[PI])
+		kfree(iovec[PI]);
+	return ret;
+}
+
diff --git a/io_uring/rw_pi.h b/io_uring/rw_pi.h
new file mode 100644
index 000000000000..f635da982484
--- /dev/null
+++ b/io_uring/rw_pi.h
@@ -0,0 +1,34 @@ 
+// SPDX-License-Identifier: GPL-2.0
+#ifndef IOU_RW_PI_H
+#define IOU_RW_PI_H
+
+#include "rw.h"
+
+struct __io_rw_pi_state {
+	struct iov_iter			iter;
+	struct iov_iter_state		iter_state;
+	struct iovec			fast_iov[UIO_FASTIOV_PI];
+};
+
+struct io_rw_pi_state {
+	struct io_rw_state		data;
+	struct __io_rw_pi_state		pi;
+};
+
+struct __io_async_rw_pi {
+	const struct iovec		*free_iovec;
+	struct __io_rw_pi_state		s;
+};
+
+struct io_async_rw_pi {
+	struct io_async_rw		data;
+	struct __io_async_rw_pi		pi;
+};
+
+int io_prep_rw_pi(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_readv_pi(struct io_kiocb *req, unsigned int issue_flags);
+int io_readv_pi_prep_async(struct io_kiocb *req);
+int io_writev_pi(struct io_kiocb *req, unsigned int issue_flags);
+int io_writev_pi_prep_async(struct io_kiocb *req);
+void io_readv_writev_pi_cleanup(struct io_kiocb *req);
+#endif