diff mbox series

[08/10] io_uring/rw: add support to send meta along with read/write

Message ID 20240425183943.6319-9-joshi.k@samsung.com (mailing list archive)
State New
Headers show
Series Read/Write with meta/integrity | expand

Commit Message

Kanchan Joshi April 25, 2024, 6:39 p.m. UTC
From: Anuj Gupta <anuj20.g@samsung.com>

This patch introduces IORING_OP_READ_META and IORING_OP_WRITE_META
opcodes which allow sending a meta buffer along with read/write. The
meta buffer, its length, apptag and integrity check flags can be specified
by the application in the newly introduced meta_buf, meta_len, apptag and
meta_flags fields of SQE.

Use the user-passed information to prepare uio_meta descriptor, and
pass it down using kiocb->private.

Meta exchange is supported only for direct IO.

Signed-off-by: Anuj Gupta <anuj20.g@samsung.com>
Signed-off-by: Kanchan Joshi <joshi.k@samsung.com>
Signed-off-by: Nitesh Shetty <nj.shetty@samsung.com>
---
 include/linux/fs.h            |  1 +
 include/uapi/linux/io_uring.h | 15 +++++++
 io_uring/io_uring.c           |  4 ++
 io_uring/opdef.c              | 30 ++++++++++++++
 io_uring/rw.c                 | 76 +++++++++++++++++++++++++++++++++--
 io_uring/rw.h                 | 11 ++++-
 6 files changed, 132 insertions(+), 5 deletions(-)

Comments

Jens Axboe April 26, 2024, 2:25 p.m. UTC | #1
> diff --git a/io_uring/rw.c b/io_uring/rw.c
> index 3134a6ece1be..b2c9ac91d5e5 100644
> --- a/io_uring/rw.c
> +++ b/io_uring/rw.c
> @@ -587,6 +623,8 @@ static int kiocb_done(struct io_kiocb *req, ssize_t ret,
>  
>  		req->flags &= ~REQ_F_REISSUE;
>  		iov_iter_restore(&io->iter, &io->iter_state);
> +		if (unlikely(rw->kiocb.ki_flags & IOCB_USE_META))
> +			iov_iter_restore(&io->meta.iter, &io->iter_meta_state);
>  		return -EAGAIN;
>  	}
>  	return IOU_ISSUE_SKIP_COMPLETE;
This puzzles me a bit, why is the restore now dependent on
IOCB_USE_META?

> @@ -768,7 +806,7 @@ static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
>  	if (!(req->flags & REQ_F_FIXED_FILE))
>  		req->flags |= io_file_get_flags(file);
>  
> -	kiocb->ki_flags = file->f_iocb_flags;
> +	kiocb->ki_flags |= file->f_iocb_flags;
>  	ret = kiocb_set_rw_flags(kiocb, rw->flags);
>  	if (unlikely(ret))
>  		return ret;
> @@ -787,7 +825,8 @@ static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
>  		if (!(kiocb->ki_flags & IOCB_DIRECT) || !file->f_op->iopoll)
>  			return -EOPNOTSUPP;
>  
> -		kiocb->private = NULL;
> +		if (likely(!(kiocb->ki_flags & IOCB_USE_META)))
> +			kiocb->private = NULL;
>  		kiocb->ki_flags |= IOCB_HIPRI;
>  		kiocb->ki_complete = io_complete_rw_iopoll;
>  		req->iopoll_completed = 0;

Why don't we just set ->private generically earlier, eg like we do for
the ki_flags, rather than have it be a branch in here?

> @@ -853,7 +892,8 @@ static int __io_read(struct io_kiocb *req, unsigned int issue_flags)
>  	} else if (ret == -EIOCBQUEUED) {
>  		return IOU_ISSUE_SKIP_COMPLETE;
>  	} else if (ret == req->cqe.res || ret <= 0 || !force_nonblock ||
> -		   (req->flags & REQ_F_NOWAIT) || !need_complete_io(req)) {
> +		   (req->flags & REQ_F_NOWAIT) || !need_complete_io(req) ||
> +		   (kiocb->ki_flags & IOCB_USE_META)) {
>  		/* read all, failed, already did sync or don't want to retry */
>  		goto done;
>  	}

Would it be cleaner to stuff that IOCB_USE_META check in
need_complete_io(), as that would closer seem to describe why that check
is there in the first place? With a comment.

> @@ -864,6 +904,12 @@ static int __io_read(struct io_kiocb *req, unsigned int issue_flags)
>  	 * manually if we need to.
>  	 */
>  	iov_iter_restore(&io->iter, &io->iter_state);
> +	if (unlikely(kiocb->ki_flags & IOCB_USE_META)) {
> +		/* don't handle partial completion for read + meta */
> +		if (ret > 0)
> +			goto done;
> +		iov_iter_restore(&io->meta.iter, &io->iter_meta_state);
> +	}

Also seems a bit odd why we need this check here, surely if this is
needed other "don't do retry IOs" conditions would be the same?

> @@ -1053,7 +1099,8 @@ int io_write(struct io_kiocb *req, unsigned int issue_flags)
>  		if (ret2 == -EAGAIN && (req->ctx->flags & IORING_SETUP_IOPOLL))
>  			goto ret_eagain;
>  
> -		if (ret2 != req->cqe.res && ret2 >= 0 && need_complete_io(req)) {
> +		if (ret2 != req->cqe.res && ret2 >= 0 && need_complete_io(req)
> +				&& !(kiocb->ki_flags & IOCB_USE_META)) {
>  			trace_io_uring_short_write(req->ctx, kiocb->ki_pos - ret2,
>  						req->cqe.res, ret2);

Same here. Would be nice to integrate this a bit nicer rather than have
a bunch of "oh we also need this extra check here" conditions.

> @@ -1074,12 +1121,33 @@ int io_write(struct io_kiocb *req, unsigned int issue_flags)
>  	} else {
>  ret_eagain:
>  		iov_iter_restore(&io->iter, &io->iter_state);
> +		if (unlikely(kiocb->ki_flags & IOCB_USE_META))
> +			iov_iter_restore(&io->meta.iter, &io->iter_meta_state);
>  		if (kiocb->ki_flags & IOCB_WRITE)
>  			io_req_end_write(req);
>  		return -EAGAIN;
>  	}
>  }

Same question here on the (now) conditional restore.

> +int io_rw_meta(struct io_kiocb *req, unsigned int issue_flags)
> +{
> +	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
> +	struct io_async_rw *io = req->async_data;
> +	struct kiocb *kiocb = &rw->kiocb;
> +	int ret;
> +
> +	if (!(req->file->f_flags & O_DIRECT))
> +		return -EOPNOTSUPP;

Why isn't this just caught at init time when IOCB_DIRECT is checked?

> +	kiocb->private = &io->meta;
> +	if (req->opcode == IORING_OP_READ_META)
> +		ret = io_read(req, issue_flags);
> +	else
> +		ret = io_write(req, issue_flags);
> +
> +	return ret;
> +}

kiocb->private is a bit of an odd beast, and ownership isn't clear at
all. It would make the most sense if the owner of the kiocb (eg io_uring
in this case) owned it, but take a look at eg ocfs2 and see what they do
with it... I think this would blow up as a result.

Outside of that, and with the O_DIRECT thing check fixed, this should
just be two separate functions, one for read and one for write.
Kanchan Joshi April 29, 2024, 8:11 p.m. UTC | #2
On 4/26/2024 7:55 PM, Jens Axboe wrote:
>> diff --git a/io_uring/rw.c b/io_uring/rw.c
>> index 3134a6ece1be..b2c9ac91d5e5 100644
>> --- a/io_uring/rw.c
>> +++ b/io_uring/rw.c
>> @@ -587,6 +623,8 @@ static int kiocb_done(struct io_kiocb *req, ssize_t ret,
>>   
>>   		req->flags &= ~REQ_F_REISSUE;
>>   		iov_iter_restore(&io->iter, &io->iter_state);
>> +		if (unlikely(rw->kiocb.ki_flags & IOCB_USE_META))
>> +			iov_iter_restore(&io->meta.iter, &io->iter_meta_state);
>>   		return -EAGAIN;
>>   	}
>>   	return IOU_ISSUE_SKIP_COMPLETE;
> This puzzles me a bit, why is the restore now dependent on
> IOCB_USE_META?

Both saving/restore for meta is under this condition (so seemed natural).
Also, to avoid growing "struct io_async_rw" too much, this patch keeps 
keeps meta/iter_meta_state in the same memory as wpq. So doing this 
unconditionally can corrupt wpq for buffered io.

>> @@ -768,7 +806,7 @@ static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
>>   	if (!(req->flags & REQ_F_FIXED_FILE))
>>   		req->flags |= io_file_get_flags(file);
>>   
>> -	kiocb->ki_flags = file->f_iocb_flags;
>> +	kiocb->ki_flags |= file->f_iocb_flags;
>>   	ret = kiocb_set_rw_flags(kiocb, rw->flags);
>>   	if (unlikely(ret))
>>   		return ret;
>> @@ -787,7 +825,8 @@ static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
>>   		if (!(kiocb->ki_flags & IOCB_DIRECT) || !file->f_op->iopoll)
>>   			return -EOPNOTSUPP;
>>   
>> -		kiocb->private = NULL;
>> +		if (likely(!(kiocb->ki_flags & IOCB_USE_META)))
>> +			kiocb->private = NULL;
>>   		kiocb->ki_flags |= IOCB_HIPRI;
>>   		kiocb->ki_complete = io_complete_rw_iopoll;
>>   		req->iopoll_completed = 0;
> 
> Why don't we just set ->private generically earlier, eg like we do for
> the ki_flags, rather than have it be a branch in here?

Not sure if I am missing what you have in mind.
But kiocb->private was set before we reached to this point (in 
io_rw_meta). So we don't overwrite that here.

>> @@ -853,7 +892,8 @@ static int __io_read(struct io_kiocb *req, unsigned int issue_flags)
>>   	} else if (ret == -EIOCBQUEUED) {
>>   		return IOU_ISSUE_SKIP_COMPLETE;
>>   	} else if (ret == req->cqe.res || ret <= 0 || !force_nonblock ||
>> -		   (req->flags & REQ_F_NOWAIT) || !need_complete_io(req)) {
>> +		   (req->flags & REQ_F_NOWAIT) || !need_complete_io(req) ||
>> +		   (kiocb->ki_flags & IOCB_USE_META)) {
>>   		/* read all, failed, already did sync or don't want to retry */
>>   		goto done;
>>   	}
> 
> Would it be cleaner to stuff that IOCB_USE_META check in
> need_complete_io(), as that would closer seem to describe why that check
> is there in the first place? With a comment.

Yes, will do.

>> @@ -864,6 +904,12 @@ static int __io_read(struct io_kiocb *req, unsigned int issue_flags)
>>   	 * manually if we need to.
>>   	 */
>>   	iov_iter_restore(&io->iter, &io->iter_state);
>> +	if (unlikely(kiocb->ki_flags & IOCB_USE_META)) {
>> +		/* don't handle partial completion for read + meta */
>> +		if (ret > 0)
>> +			goto done;
>> +		iov_iter_restore(&io->meta.iter, &io->iter_meta_state);
>> +	}
> 
> Also seems a bit odd why we need this check here, surely if this is
> needed other "don't do retry IOs" conditions would be the same?

Yes, will revisit.
>> @@ -1053,7 +1099,8 @@ int io_write(struct io_kiocb *req, unsigned int issue_flags)
>>   		if (ret2 == -EAGAIN && (req->ctx->flags & IORING_SETUP_IOPOLL))
>>   			goto ret_eagain;
>>   
>> -		if (ret2 != req->cqe.res && ret2 >= 0 && need_complete_io(req)) {
>> +		if (ret2 != req->cqe.res && ret2 >= 0 && need_complete_io(req)
>> +				&& !(kiocb->ki_flags & IOCB_USE_META)) {
>>   			trace_io_uring_short_write(req->ctx, kiocb->ki_pos - ret2,
>>   						req->cqe.res, ret2);
> 
> Same here. Would be nice to integrate this a bit nicer rather than have
> a bunch of "oh we also need this extra check here" conditions.

Will look into this too.
>> @@ -1074,12 +1121,33 @@ int io_write(struct io_kiocb *req, unsigned int issue_flags)
>>   	} else {
>>   ret_eagain:
>>   		iov_iter_restore(&io->iter, &io->iter_state);
>> +		if (unlikely(kiocb->ki_flags & IOCB_USE_META))
>> +			iov_iter_restore(&io->meta.iter, &io->iter_meta_state);
>>   		if (kiocb->ki_flags & IOCB_WRITE)
>>   			io_req_end_write(req);
>>   		return -EAGAIN;
>>   	}
>>   }
> 
> Same question here on the (now) conditional restore.

Did not get the concern. Do you prefer it unconditional.

>> +int io_rw_meta(struct io_kiocb *req, unsigned int issue_flags)
>> +{
>> +	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
>> +	struct io_async_rw *io = req->async_data;
>> +	struct kiocb *kiocb = &rw->kiocb;
>> +	int ret;
>> +
>> +	if (!(req->file->f_flags & O_DIRECT))
>> +		return -EOPNOTSUPP;
> 
> Why isn't this just caught at init time when IOCB_DIRECT is checked?

io_rw_init_file() gets invoked after this, and IOCB_DIRECT check is only 
for IOPOLL situation. We want to check/fail it regardless of IOPOLL.

> 
>> +	kiocb->private = &io->meta;
>> +	if (req->opcode == IORING_OP_READ_META)
>> +		ret = io_read(req, issue_flags);
>> +	else
>> +		ret = io_write(req, issue_flags);
>> +
>> +	return ret;
>> +}
> 
> kiocb->private is a bit of an odd beast, and ownership isn't clear at
> all. It would make the most sense if the owner of the kiocb (eg io_uring
> in this case) owned it, but take a look at eg ocfs2 and see what they do
> with it... I think this would blow up as a result.

Yes, ocfs2 is making use of kiocb->private. But seems that's fine. In 
io_uring we use the field only to send the information down. ocfs2 (or 
anything else unaware of this interface) may just overwrite the 
kiocb->private.
If the lower layer want to support meta exchange, it is supposed to 
extract meta-descriptor from kiocb->private before altering it.

This case is same for block direct path too when we are doing polled io.
diff mbox series

Patch

diff --git a/include/linux/fs.h b/include/linux/fs.h
index 8dfd53b52744..8868d17ae8f9 100644
--- a/include/linux/fs.h
+++ b/include/linux/fs.h
@@ -329,6 +329,7 @@  struct readahead_control;
 #define IOCB_NOIO		(1 << 20)
 /* can use bio alloc cache */
 #define IOCB_ALLOC_CACHE	(1 << 21)
+#define IOCB_USE_META		(1 << 22)
 /*
  * IOCB_DIO_CALLER_COMP can be set by the iocb owner, to indicate that the
  * iocb completion can be passed back to the owner for execution from a safe
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index a7f847543a7f..d4653b52fdd6 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -97,6 +97,12 @@  struct io_uring_sqe {
 			__u64	addr3;
 			__u64	__pad2[1];
 		};
+		struct {
+			__u64	meta_addr;
+			__u32	meta_len;
+			__u16	meta_flags;
+			__u16	apptag;
+		};
 		__u64	optval;
 		/*
 		 * If the ring is initialized with IORING_SETUP_SQE128, then
@@ -106,6 +112,13 @@  struct io_uring_sqe {
 	};
 };
 
+/*
+ * meta io flags
+ */
+#define META_CHK_GUARD	(1U << 0)	/* guard is valid */
+#define META_CHK_APPTAG	(1U << 1)	/* app tag is valid */
+#define META_CHK_REFTAG	(1U << 2)	/* ref tag is valid */
+
 /*
  * If sqe->file_index is set to this for opcodes that instantiate a new
  * direct descriptor (like openat/openat2/accept), then io_uring will allocate
@@ -256,6 +269,8 @@  enum io_uring_op {
 	IORING_OP_FUTEX_WAITV,
 	IORING_OP_FIXED_FD_INSTALL,
 	IORING_OP_FTRUNCATE,
+	IORING_OP_READ_META,
+	IORING_OP_WRITE_META,
 
 	/* this goes last, obviously */
 	IORING_OP_LAST,
diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
index 3c9087f37c43..af95fc8d988c 100644
--- a/io_uring/io_uring.c
+++ b/io_uring/io_uring.c
@@ -3723,7 +3723,11 @@  static int __init io_uring_init(void)
 	BUILD_BUG_SQE_ELEM(44, __u16,  addr_len);
 	BUILD_BUG_SQE_ELEM(46, __u16,  __pad3[0]);
 	BUILD_BUG_SQE_ELEM(48, __u64,  addr3);
+	BUILD_BUG_SQE_ELEM(48, __u64,  meta_addr);
 	BUILD_BUG_SQE_ELEM_SIZE(48, 0, cmd);
+	BUILD_BUG_SQE_ELEM(56, __u32,  meta_len);
+	BUILD_BUG_SQE_ELEM(60, __u16,  meta_flags);
+	BUILD_BUG_SQE_ELEM(62, __u16,  apptag);
 	BUILD_BUG_SQE_ELEM(56, __u64,  __pad2);
 
 	BUILD_BUG_ON(sizeof(struct io_uring_files_update) !=
diff --git a/io_uring/opdef.c b/io_uring/opdef.c
index a16f73938ebb..8b8fdcfb7f30 100644
--- a/io_uring/opdef.c
+++ b/io_uring/opdef.c
@@ -444,6 +444,28 @@  const struct io_issue_def io_issue_defs[] = {
 		.prep			= io_eopnotsupp_prep,
 #endif
 	},
+	[IORING_OP_READ_META] = {
+		.needs_file		= 1,
+		.plug			= 1,
+		.audit_skip		= 1,
+		.ioprio			= 1,
+		.iopoll			= 1,
+		.iopoll_queue		= 1,
+		.async_size		= sizeof(struct io_async_rw),
+		.prep			= io_prep_read_meta,
+		.issue			= io_rw_meta,
+	},
+	[IORING_OP_WRITE_META] = {
+		.needs_file		= 1,
+		.plug			= 1,
+		.audit_skip		= 1,
+		.ioprio			= 1,
+		.iopoll			= 1,
+		.iopoll_queue		= 1,
+		.async_size		= sizeof(struct io_async_rw),
+		.prep			= io_prep_write_meta,
+		.issue			= io_rw_meta,
+	},
 	[IORING_OP_READ_MULTISHOT] = {
 		.needs_file		= 1,
 		.unbound_nonreg_file	= 1,
@@ -510,6 +532,14 @@  const struct io_cold_def io_cold_defs[] = {
 		.cleanup		= io_readv_writev_cleanup,
 		.fail			= io_rw_fail,
 	},
+	[IORING_OP_READ_META] = {
+		.name			= "READ_META",
+		.fail			= io_rw_fail,
+	},
+	[IORING_OP_WRITE_META] = {
+		.name			= "WRITE_META",
+		.fail			= io_rw_fail,
+	},
 	[IORING_OP_FSYNC] = {
 		.name			= "FSYNC",
 	},
diff --git a/io_uring/rw.c b/io_uring/rw.c
index 3134a6ece1be..b2c9ac91d5e5 100644
--- a/io_uring/rw.c
+++ b/io_uring/rw.c
@@ -269,6 +269,7 @@  static int io_prep_rw(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 		rw->kiocb.ki_ioprio = get_current_ioprio();
 	}
 	rw->kiocb.dio_complete = NULL;
+	rw->kiocb.ki_flags = 0;
 
 	rw->addr = READ_ONCE(sqe->addr);
 	rw->len = READ_ONCE(sqe->len);
@@ -286,6 +287,41 @@  int io_prep_write(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	return io_prep_rw(req, sqe, ITER_SOURCE, true);
 }
 
+static int io_prep_rw_meta(struct io_kiocb *req, const struct io_uring_sqe *sqe,
+		    int ddir, bool import)
+{
+	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
+	struct io_async_rw *io;
+	struct kiocb *kiocb = &rw->kiocb;
+	int ret;
+
+	ret = io_prep_rw(req, sqe, ddir, import);
+	if (unlikely(ret))
+		return ret;
+
+	io = req->async_data;
+	kiocb->ki_flags |= IOCB_USE_META;
+	io->meta.flags = READ_ONCE(sqe->meta_flags);
+	io->meta.apptag = READ_ONCE(sqe->apptag);
+	ret = import_ubuf(ddir, u64_to_user_ptr(READ_ONCE(sqe->meta_addr)),
+			     READ_ONCE(sqe->meta_len), &io->meta.iter);
+	if (unlikely(ret < 0))
+		return ret;
+
+	iov_iter_save_state(&io->meta.iter, &io->iter_meta_state);
+	return 0;
+}
+
+int io_prep_read_meta(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+	return io_prep_rw_meta(req, sqe, ITER_DEST, true);
+}
+
+int io_prep_write_meta(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+	return io_prep_rw_meta(req, sqe, ITER_SOURCE, true);
+}
+
 static int io_prep_rwv(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 		       int ddir)
 {
@@ -587,6 +623,8 @@  static int kiocb_done(struct io_kiocb *req, ssize_t ret,
 
 		req->flags &= ~REQ_F_REISSUE;
 		iov_iter_restore(&io->iter, &io->iter_state);
+		if (unlikely(rw->kiocb.ki_flags & IOCB_USE_META))
+			iov_iter_restore(&io->meta.iter, &io->iter_meta_state);
 		return -EAGAIN;
 	}
 	return IOU_ISSUE_SKIP_COMPLETE;
@@ -768,7 +806,7 @@  static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
 	if (!(req->flags & REQ_F_FIXED_FILE))
 		req->flags |= io_file_get_flags(file);
 
-	kiocb->ki_flags = file->f_iocb_flags;
+	kiocb->ki_flags |= file->f_iocb_flags;
 	ret = kiocb_set_rw_flags(kiocb, rw->flags);
 	if (unlikely(ret))
 		return ret;
@@ -787,7 +825,8 @@  static int io_rw_init_file(struct io_kiocb *req, fmode_t mode)
 		if (!(kiocb->ki_flags & IOCB_DIRECT) || !file->f_op->iopoll)
 			return -EOPNOTSUPP;
 
-		kiocb->private = NULL;
+		if (likely(!(kiocb->ki_flags & IOCB_USE_META)))
+			kiocb->private = NULL;
 		kiocb->ki_flags |= IOCB_HIPRI;
 		kiocb->ki_complete = io_complete_rw_iopoll;
 		req->iopoll_completed = 0;
@@ -853,7 +892,8 @@  static int __io_read(struct io_kiocb *req, unsigned int issue_flags)
 	} else if (ret == -EIOCBQUEUED) {
 		return IOU_ISSUE_SKIP_COMPLETE;
 	} else if (ret == req->cqe.res || ret <= 0 || !force_nonblock ||
-		   (req->flags & REQ_F_NOWAIT) || !need_complete_io(req)) {
+		   (req->flags & REQ_F_NOWAIT) || !need_complete_io(req) ||
+		   (kiocb->ki_flags & IOCB_USE_META)) {
 		/* read all, failed, already did sync or don't want to retry */
 		goto done;
 	}
@@ -864,6 +904,12 @@  static int __io_read(struct io_kiocb *req, unsigned int issue_flags)
 	 * manually if we need to.
 	 */
 	iov_iter_restore(&io->iter, &io->iter_state);
+	if (unlikely(kiocb->ki_flags & IOCB_USE_META)) {
+		/* don't handle partial completion for read + meta */
+		if (ret > 0)
+			goto done;
+		iov_iter_restore(&io->meta.iter, &io->iter_meta_state);
+	}
 
 	do {
 		/*
@@ -1053,7 +1099,8 @@  int io_write(struct io_kiocb *req, unsigned int issue_flags)
 		if (ret2 == -EAGAIN && (req->ctx->flags & IORING_SETUP_IOPOLL))
 			goto ret_eagain;
 
-		if (ret2 != req->cqe.res && ret2 >= 0 && need_complete_io(req)) {
+		if (ret2 != req->cqe.res && ret2 >= 0 && need_complete_io(req)
+				&& !(kiocb->ki_flags & IOCB_USE_META)) {
 			trace_io_uring_short_write(req->ctx, kiocb->ki_pos - ret2,
 						req->cqe.res, ret2);
 
@@ -1074,12 +1121,33 @@  int io_write(struct io_kiocb *req, unsigned int issue_flags)
 	} else {
 ret_eagain:
 		iov_iter_restore(&io->iter, &io->iter_state);
+		if (unlikely(kiocb->ki_flags & IOCB_USE_META))
+			iov_iter_restore(&io->meta.iter, &io->iter_meta_state);
 		if (kiocb->ki_flags & IOCB_WRITE)
 			io_req_end_write(req);
 		return -EAGAIN;
 	}
 }
 
+int io_rw_meta(struct io_kiocb *req, unsigned int issue_flags)
+{
+	struct io_rw *rw = io_kiocb_to_cmd(req, struct io_rw);
+	struct io_async_rw *io = req->async_data;
+	struct kiocb *kiocb = &rw->kiocb;
+	int ret;
+
+	if (!(req->file->f_flags & O_DIRECT))
+		return -EOPNOTSUPP;
+
+	kiocb->private = &io->meta;
+	if (req->opcode == IORING_OP_READ_META)
+		ret = io_read(req, issue_flags);
+	else
+		ret = io_write(req, issue_flags);
+
+	return ret;
+}
+
 void io_rw_fail(struct io_kiocb *req)
 {
 	int res;
diff --git a/io_uring/rw.h b/io_uring/rw.h
index 3f432dc75441..a640071064e3 100644
--- a/io_uring/rw.h
+++ b/io_uring/rw.h
@@ -9,7 +9,13 @@  struct io_async_rw {
 	struct iovec			fast_iov;
 	struct iovec			*free_iovec;
 	int				free_iov_nr;
-	struct wait_page_queue		wpq;
+	union {
+		struct wait_page_queue		wpq;
+		struct {
+			struct uio_meta			meta;
+			struct iov_iter_state		iter_meta_state;
+		};
+	};
 };
 
 int io_prep_read_fixed(struct io_kiocb *req, const struct io_uring_sqe *sqe);
@@ -17,9 +23,12 @@  int io_prep_write_fixed(struct io_kiocb *req, const struct io_uring_sqe *sqe);
 int io_prep_readv(struct io_kiocb *req, const struct io_uring_sqe *sqe);
 int io_prep_writev(struct io_kiocb *req, const struct io_uring_sqe *sqe);
 int io_prep_read(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_prep_read_meta(struct io_kiocb *req, const struct io_uring_sqe *sqe);
 int io_prep_write(struct io_kiocb *req, const struct io_uring_sqe *sqe);
+int io_prep_write_meta(struct io_kiocb *req, const struct io_uring_sqe *sqe);
 int io_read(struct io_kiocb *req, unsigned int issue_flags);
 int io_write(struct io_kiocb *req, unsigned int issue_flags);
+int io_rw_meta(struct io_kiocb *req, unsigned int issue_flags);
 void io_readv_writev_cleanup(struct io_kiocb *req);
 void io_rw_fail(struct io_kiocb *req);
 void io_req_rw_complete(struct io_kiocb *req, struct io_tw_state *ts);