diff mbox series

[15/15] io_uring: support true async buffered reads, if file provides it

Message ID 20200618144355.17324-16-axboe@kernel.dk (mailing list archive)
State New, archived
Headers show
Series [01/15] block: provide plug based way of signaling forced no-wait semantics | expand

Commit Message

Jens Axboe June 18, 2020, 2:43 p.m. UTC
If the file is flagged with FMODE_BUF_RASYNC, then we don't have to punt
the buffered read to an io-wq worker. Instead we can rely on page
unlocking callbacks to support retry based async IO. This is a lot more
efficient than doing async thread offload.

The retry is done similarly to how we handle poll based retry. From
the unlock callback, we simply queue the retry to a task_work based
handler.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
---
 fs/io_uring.c | 145 +++++++++++++++++++++++++++++++++++++++++++++++---
 1 file changed, 137 insertions(+), 8 deletions(-)

Comments

Pavel Begunkov June 23, 2020, 12:39 p.m. UTC | #1
On 18/06/2020 17:43, Jens Axboe wrote:
> If the file is flagged with FMODE_BUF_RASYNC, then we don't have to punt
> the buffered read to an io-wq worker. Instead we can rely on page
> unlocking callbacks to support retry based async IO. This is a lot more
> efficient than doing async thread offload.
> 
> The retry is done similarly to how we handle poll based retry. From
> the unlock callback, we simply queue the retry to a task_work based
> handler.
> 
> Signed-off-by: Jens Axboe <axboe@kernel.dk>
> ---
>  fs/io_uring.c | 145 +++++++++++++++++++++++++++++++++++++++++++++++---
>  1 file changed, 137 insertions(+), 8 deletions(-)
> 
...
>  static int io_read(struct io_kiocb *req, bool force_nonblock)
>  {
>  	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
> @@ -2784,10 +2907,7 @@ static int io_read(struct io_kiocb *req, bool force_nonblock)
>  		unsigned long nr_segs = iter.nr_segs;
>  		ssize_t ret2 = 0;
>  
> -		if (req->file->f_op->read_iter)
> -			ret2 = call_read_iter(req->file, kiocb, &iter);
> -		else
> -			ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
> +		ret2 = io_iter_do_read(req, &iter);
>  
>  		/* Catch -EAGAIN return for forced non-blocking submission */
>  		if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) {
> @@ -2799,17 +2919,26 @@ static int io_read(struct io_kiocb *req, bool force_nonblock)
>  			ret = io_setup_async_rw(req, io_size, iovec,
>  						inline_vecs, &iter);
>  			if (ret)
> -				goto out_free;
> +				goto out;
>  			/* any defer here is final, must blocking retry */
>  			if (!(req->flags & REQ_F_NOWAIT) &&
>  			    !file_can_poll(req->file))
>  				req->flags |= REQ_F_MUST_PUNT;
> +			/* if we can retry, do so with the callbacks armed */
> +			if (io_rw_should_retry(req)) {
> +				ret2 = io_iter_do_read(req, &iter);
> +				if (ret2 == -EIOCBQUEUED) {
> +					goto out;
> +				} else if (ret2 != -EAGAIN) {
> +					kiocb_done(kiocb, ret2);
> +					goto out;
> +				}
> +			}
> +			kiocb->ki_flags &= ~IOCB_WAITQ;
>  			return -EAGAIN;
>  		}
>  	}
> -out_free:
> -	kfree(iovec);
> -	req->flags &= ~REQ_F_NEED_CLEANUP;

This looks fishy. For instance, if it fails early on rw_verify_area(), how would
it free yet on-stack iovec? Is it handled somehow?

> +out:
>  	return ret;
>  }
>  
>
Jens Axboe June 23, 2020, 2:38 p.m. UTC | #2
On 6/23/20 6:39 AM, Pavel Begunkov wrote:
> On 18/06/2020 17:43, Jens Axboe wrote:
>> If the file is flagged with FMODE_BUF_RASYNC, then we don't have to punt
>> the buffered read to an io-wq worker. Instead we can rely on page
>> unlocking callbacks to support retry based async IO. This is a lot more
>> efficient than doing async thread offload.
>>
>> The retry is done similarly to how we handle poll based retry. From
>> the unlock callback, we simply queue the retry to a task_work based
>> handler.
>>
>> Signed-off-by: Jens Axboe <axboe@kernel.dk>
>> ---
>>  fs/io_uring.c | 145 +++++++++++++++++++++++++++++++++++++++++++++++---
>>  1 file changed, 137 insertions(+), 8 deletions(-)
>>
> ...
>>  static int io_read(struct io_kiocb *req, bool force_nonblock)
>>  {
>>  	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
>> @@ -2784,10 +2907,7 @@ static int io_read(struct io_kiocb *req, bool force_nonblock)
>>  		unsigned long nr_segs = iter.nr_segs;
>>  		ssize_t ret2 = 0;
>>  
>> -		if (req->file->f_op->read_iter)
>> -			ret2 = call_read_iter(req->file, kiocb, &iter);
>> -		else
>> -			ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
>> +		ret2 = io_iter_do_read(req, &iter);
>>  
>>  		/* Catch -EAGAIN return for forced non-blocking submission */
>>  		if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) {
>> @@ -2799,17 +2919,26 @@ static int io_read(struct io_kiocb *req, bool force_nonblock)
>>  			ret = io_setup_async_rw(req, io_size, iovec,
>>  						inline_vecs, &iter);
>>  			if (ret)
>> -				goto out_free;
>> +				goto out;
>>  			/* any defer here is final, must blocking retry */
>>  			if (!(req->flags & REQ_F_NOWAIT) &&
>>  			    !file_can_poll(req->file))
>>  				req->flags |= REQ_F_MUST_PUNT;
>> +			/* if we can retry, do so with the callbacks armed */
>> +			if (io_rw_should_retry(req)) {
>> +				ret2 = io_iter_do_read(req, &iter);
>> +				if (ret2 == -EIOCBQUEUED) {
>> +					goto out;
>> +				} else if (ret2 != -EAGAIN) {
>> +					kiocb_done(kiocb, ret2);
>> +					goto out;
>> +				}
>> +			}
>> +			kiocb->ki_flags &= ~IOCB_WAITQ;
>>  			return -EAGAIN;
>>  		}
>>  	}
>> -out_free:
>> -	kfree(iovec);
>> -	req->flags &= ~REQ_F_NEED_CLEANUP;
> 
> This looks fishy. For instance, if it fails early on rw_verify_area(), how would
> it free yet on-stack iovec? Is it handled somehow?

This was tweaked and rebased on top of the REQ_F_NEED_CLEANUP change,
it should be correct in the tree:

https://git.kernel.dk/cgit/linux-block/tree/fs/io_uring.c?h=for-5.9/io_uring#n2908
diff mbox series

Patch

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 40413fb9d07b..94282be1c413 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -78,6 +78,7 @@ 
 #include <linux/fs_struct.h>
 #include <linux/splice.h>
 #include <linux/task_work.h>
+#include <linux/pagemap.h>
 
 #define CREATE_TRACE_POINTS
 #include <trace/events/io_uring.h>
@@ -503,6 +504,8 @@  struct io_async_rw {
 	struct iovec			*iov;
 	ssize_t				nr_segs;
 	ssize_t				size;
+	struct wait_page_queue		wpq;
+	struct callback_head		task_work;
 };
 
 struct io_async_ctx {
@@ -2750,6 +2753,126 @@  static int io_read_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe,
 	return 0;
 }
 
+static void __io_async_buf_error(struct io_kiocb *req, int error)
+{
+	struct io_ring_ctx *ctx = req->ctx;
+
+	spin_lock_irq(&ctx->completion_lock);
+	io_cqring_fill_event(req, error);
+	io_commit_cqring(ctx);
+	spin_unlock_irq(&ctx->completion_lock);
+
+	io_cqring_ev_posted(ctx);
+	req_set_fail_links(req);
+	io_double_put_req(req);
+}
+
+static void io_async_buf_cancel(struct callback_head *cb)
+{
+	struct io_async_rw *rw;
+	struct io_kiocb *req;
+
+	rw = container_of(cb, struct io_async_rw, task_work);
+	req = rw->wpq.wait.private;
+	__io_async_buf_error(req, -ECANCELED);
+}
+
+static void io_async_buf_retry(struct callback_head *cb)
+{
+	struct io_async_rw *rw;
+	struct io_ring_ctx *ctx;
+	struct io_kiocb *req;
+
+	rw = container_of(cb, struct io_async_rw, task_work);
+	req = rw->wpq.wait.private;
+	ctx = req->ctx;
+
+	__set_current_state(TASK_RUNNING);
+	if (!io_sq_thread_acquire_mm(ctx, req)) {
+		mutex_lock(&ctx->uring_lock);
+		__io_queue_sqe(req, NULL);
+		mutex_unlock(&ctx->uring_lock);
+	} else {
+		__io_async_buf_error(req, -EFAULT);
+	}
+}
+
+static int io_async_buf_func(struct wait_queue_entry *wait, unsigned mode,
+			     int sync, void *arg)
+{
+	struct wait_page_queue *wpq;
+	struct io_kiocb *req = wait->private;
+	struct io_async_rw *rw = &req->io->rw;
+	struct wait_page_key *key = arg;
+	struct task_struct *tsk;
+	int ret;
+
+	wpq = container_of(wait, struct wait_page_queue, wait);
+
+	ret = wake_page_match(wpq, key);
+	if (ret != 1)
+		return ret;
+
+	list_del_init(&wait->entry);
+
+	init_task_work(&rw->task_work, io_async_buf_retry);
+	/* submit ref gets dropped, acquire a new one */
+	refcount_inc(&req->refs);
+	tsk = req->task;
+	ret = task_work_add(tsk, &rw->task_work, true);
+	if (unlikely(ret)) {
+		/* queue just for cancelation */
+		init_task_work(&rw->task_work, io_async_buf_cancel);
+		tsk = io_wq_get_task(req->ctx->io_wq);
+		task_work_add(tsk, &rw->task_work, true);
+	}
+	wake_up_process(tsk);
+	return 1;
+}
+
+static bool io_rw_should_retry(struct io_kiocb *req)
+{
+	struct kiocb *kiocb = &req->rw.kiocb;
+	int ret;
+
+	/* never retry for NOWAIT, we just complete with -EAGAIN */
+	if (req->flags & REQ_F_NOWAIT)
+		return false;
+
+	/* already tried, or we're doing O_DIRECT */
+	if (kiocb->ki_flags & (IOCB_DIRECT | IOCB_WAITQ))
+		return false;
+	/*
+	 * just use poll if we can, and don't attempt if the fs doesn't
+	 * support callback based unlocks
+	 */
+	if (file_can_poll(req->file) || !(req->file->f_mode & FMODE_BUF_RASYNC))
+		return false;
+
+	/*
+	 * If request type doesn't require req->io to defer in general,
+	 * we need to allocate it here
+	 */
+	if (!req->io && __io_alloc_async_ctx(req))
+		return false;
+
+	ret = kiocb_wait_page_queue_init(kiocb, &req->io->rw.wpq,
+						io_async_buf_func, req);
+	if (!ret) {
+		io_get_req_task(req);
+		return true;
+	}
+
+	return false;
+}
+
+static int io_iter_do_read(struct io_kiocb *req, struct iov_iter *iter)
+{
+	if (req->file->f_op->read_iter)
+		return call_read_iter(req->file, &req->rw.kiocb, iter);
+	return loop_rw_iter(READ, req->file, &req->rw.kiocb, iter);
+}
+
 static int io_read(struct io_kiocb *req, bool force_nonblock)
 {
 	struct iovec inline_vecs[UIO_FASTIOV], *iovec = inline_vecs;
@@ -2784,10 +2907,7 @@  static int io_read(struct io_kiocb *req, bool force_nonblock)
 		unsigned long nr_segs = iter.nr_segs;
 		ssize_t ret2 = 0;
 
-		if (req->file->f_op->read_iter)
-			ret2 = call_read_iter(req->file, kiocb, &iter);
-		else
-			ret2 = loop_rw_iter(READ, req->file, kiocb, &iter);
+		ret2 = io_iter_do_read(req, &iter);
 
 		/* Catch -EAGAIN return for forced non-blocking submission */
 		if (!force_nonblock || (ret2 != -EAGAIN && ret2 != -EIO)) {
@@ -2799,17 +2919,26 @@  static int io_read(struct io_kiocb *req, bool force_nonblock)
 			ret = io_setup_async_rw(req, io_size, iovec,
 						inline_vecs, &iter);
 			if (ret)
-				goto out_free;
+				goto out;
 			/* any defer here is final, must blocking retry */
 			if (!(req->flags & REQ_F_NOWAIT) &&
 			    !file_can_poll(req->file))
 				req->flags |= REQ_F_MUST_PUNT;
+			/* if we can retry, do so with the callbacks armed */
+			if (io_rw_should_retry(req)) {
+				ret2 = io_iter_do_read(req, &iter);
+				if (ret2 == -EIOCBQUEUED) {
+					goto out;
+				} else if (ret2 != -EAGAIN) {
+					kiocb_done(kiocb, ret2);
+					goto out;
+				}
+			}
+			kiocb->ki_flags &= ~IOCB_WAITQ;
 			return -EAGAIN;
 		}
 	}
-out_free:
-	kfree(iovec);
-	req->flags &= ~REQ_F_NEED_CLEANUP;
+out:
 	return ret;
 }