diff mbox series

io_uring: IORING_OP_TIMEOUT support

Message ID f0488dd6-c32b-be96-9bdc-67099f1f56f8@kernel.dk (mailing list archive)
State New, archived
Headers show
Series io_uring: IORING_OP_TIMEOUT support | expand

Commit Message

Jens Axboe Sept. 17, 2019, 4:03 p.m. UTC
There's been a few requests for functionality similar to io_getevents()
and epoll_wait(), where the user can specify a timeout for waiting on
events. I deliberately did not add support for this through the system
call initially to avoid overloading the args, but I can see that the use
cases for this are valid.

This adds support for IORING_OP_TIMEOUT. If a user wants to get woken
when waiting for events, simply submit one of these timeout commands
with your wait call. This ensures that the application sleeping on the
CQ ring waiting for events will get woken. The timeout command is passed
in a pointer to a struct timespec. Timeouts are relative.

Signed-off-by: Jens Axboe <axboe@kernel.dk>

---

liburing has a test case for this, as well as the helper function to
setup the timeout command.

Comments

Andres Freund Sept. 20, 2019, 4:53 p.m. UTC | #1
Hi,

On 2019-09-17 10:03:58 -0600, Jens Axboe wrote:
> There's been a few requests for functionality similar to io_getevents()
> and epoll_wait(), where the user can specify a timeout for waiting on
> events. I deliberately did not add support for this through the system
> call initially to avoid overloading the args, but I can see that the use
> cases for this are valid.

> This adds support for IORING_OP_TIMEOUT. If a user wants to get woken
> when waiting for events, simply submit one of these timeout commands
> with your wait call. This ensures that the application sleeping on the
> CQ ring waiting for events will get woken. The timeout command is passed
> in a pointer to a struct timespec. Timeouts are relative.

Hm. This interface wouldn't allow to to reliably use a timeout waiting for
io_uring_enter(..., min_complete > 1, ING_ENTER_GETEVENTS, ...)
right?

I can easily imagine usecases where I'd want to submit a bunch of ios
and wait for all of their completion to minimize unnecessary context
switches, as all IOs are required to continue. But with a relatively
small timeout, to allow switching to do other work etc.

- Andres
Jens Axboe Sept. 20, 2019, 8:18 p.m. UTC | #2
On 9/20/19 10:53 AM, Andres Freund wrote:
> Hi,
> 
> On 2019-09-17 10:03:58 -0600, Jens Axboe wrote:
>> There's been a few requests for functionality similar to io_getevents()
>> and epoll_wait(), where the user can specify a timeout for waiting on
>> events. I deliberately did not add support for this through the system
>> call initially to avoid overloading the args, but I can see that the use
>> cases for this are valid.
> 
>> This adds support for IORING_OP_TIMEOUT. If a user wants to get woken
>> when waiting for events, simply submit one of these timeout commands
>> with your wait call. This ensures that the application sleeping on the
>> CQ ring waiting for events will get woken. The timeout command is passed
>> in a pointer to a struct timespec. Timeouts are relative.
> 
> Hm. This interface wouldn't allow to to reliably use a timeout waiting for
> io_uring_enter(..., min_complete > 1, ING_ENTER_GETEVENTS, ...)
> right?

I've got a (unpublished as of yet) version that allows you to wait for N
events, and canceling the timer it met. So that does allow you to reliably
wait for N events.

> I can easily imagine usecases where I'd want to submit a bunch of ios
> and wait for all of their completion to minimize unnecessary context
> switches, as all IOs are required to continue. But with a relatively
> small timeout, to allow switching to do other work etc.

The question is if it's worth it to add support for "wait for these N
exact events", or whether "wait for N events" is enough. The application
needs to read those completions anyway, and could then decide to loop
if it's still missing some events. Downside is that it may mean more
calls to wait, but since the io_uring is rarely shared, it might be
just fine.

Curious on your opinion.
Andres Freund Sept. 20, 2019, 8:56 p.m. UTC | #3
Hi,

On 2019-09-20 14:18:07 -0600, Jens Axboe wrote:
> On 9/20/19 10:53 AM, Andres Freund wrote:
> > Hi,
> > 
> > On 2019-09-17 10:03:58 -0600, Jens Axboe wrote:
> >> There's been a few requests for functionality similar to io_getevents()
> >> and epoll_wait(), where the user can specify a timeout for waiting on
> >> events. I deliberately did not add support for this through the system
> >> call initially to avoid overloading the args, but I can see that the use
> >> cases for this are valid.
> > 
> >> This adds support for IORING_OP_TIMEOUT. If a user wants to get woken
> >> when waiting for events, simply submit one of these timeout commands
> >> with your wait call. This ensures that the application sleeping on the
> >> CQ ring waiting for events will get woken. The timeout command is passed
> >> in a pointer to a struct timespec. Timeouts are relative.
> > 
> > Hm. This interface wouldn't allow to to reliably use a timeout waiting for
> > io_uring_enter(..., min_complete > 1, ING_ENTER_GETEVENTS, ...)
> > right?
> 
> I've got a (unpublished as of yet) version that allows you to wait for N
> events, and canceling the timer it met. So that does allow you to reliably
> wait for N events.

Cool.

I'm not quite sure how to parse "canceling the timer it met".

I assume you mean that one could ask for min_complete, and
IORING_OP_TIMEOUT would interrupt that wait, even if fewer than
min_complete have been collected?  It'd probably be good to return 0
instead of EINTR if at least one event is ready, otherwise it does seem
to make sense.


> > I can easily imagine usecases where I'd want to submit a bunch of ios
> > and wait for all of their completion to minimize unnecessary context
> > switches, as all IOs are required to continue. But with a relatively
> > small timeout, to allow switching to do other work etc.
> 
> The question is if it's worth it to add support for "wait for these N
> exact events", or whether "wait for N events" is enough. The application
> needs to read those completions anyway, and could then decide to loop
> if it's still missing some events. Downside is that it may mean more
> calls to wait, but since the io_uring is rarely shared, it might be
> just fine.

I think "wait for N events" is sufficient. I'm not even sure how one
could safely use "wait for these N exact events", or what precisely it
would mean.  All the usecases for min_complete that I can think of
basically just want to avoid unnecessary userspace transitions if not
enough work has been done to have a chance to finish its task - but if
there's plenty results other than the just submitted ones in the queue
that's also ok.


> , but since the io_uring is rarely shared, it might be just fine.

FWIW, I think we might want to share it between (forked) processes in
postgres, but I'm not sure yet (as in, in my current rough prototype I'm
not yet doing so). Without that it's a lot harder to really benefit from
the queue ordering operations, and sharing also allows to order queue
flushes to later parts of the journal, making it more likely that
connections COMMITing earlier also finish earlier.

Another, fairly crucial, reason is that being able to finish io requests
started by other backends would make it far easier to avoid deadlock
risk between postgres connections / background processes. Otherwise it's
fairly easy to encounter situations where backend A issues a few
prefetch requests and then blocks on some lock held by process B, and B
needs the one of the prefetchted buffers from A to finish IO. There's
more complex workarounds for this, but ...

Greetings,

Andres Freund
Jens Axboe Sept. 20, 2019, 9:26 p.m. UTC | #4
On 9/20/19 2:56 PM, Andres Freund wrote:
> Hi,
> 
> On 2019-09-20 14:18:07 -0600, Jens Axboe wrote:
>> On 9/20/19 10:53 AM, Andres Freund wrote:
>>> Hi,
>>>
>>> On 2019-09-17 10:03:58 -0600, Jens Axboe wrote:
>>>> There's been a few requests for functionality similar to io_getevents()
>>>> and epoll_wait(), where the user can specify a timeout for waiting on
>>>> events. I deliberately did not add support for this through the system
>>>> call initially to avoid overloading the args, but I can see that the use
>>>> cases for this are valid.
>>>
>>>> This adds support for IORING_OP_TIMEOUT. If a user wants to get woken
>>>> when waiting for events, simply submit one of these timeout commands
>>>> with your wait call. This ensures that the application sleeping on the
>>>> CQ ring waiting for events will get woken. The timeout command is passed
>>>> in a pointer to a struct timespec. Timeouts are relative.
>>>
>>> Hm. This interface wouldn't allow to to reliably use a timeout waiting for
>>> io_uring_enter(..., min_complete > 1, ING_ENTER_GETEVENTS, ...)
>>> right?
>>
>> I've got a (unpublished as of yet) version that allows you to wait for N
>> events, and canceling the timer it met. So that does allow you to reliably
>> wait for N events.
> 
> Cool.
> 
> I'm not quite sure how to parse "canceling the timer it met".

s/it/if

> I assume you mean that one could ask for min_complete, and
> IORING_OP_TIMEOUT would interrupt that wait, even if fewer than
> min_complete have been collected?  It'd probably be good to return 0
> instead of EINTR if at least one event is ready, otherwise it does seem
> to make sense.

Right, what I mean is if you ask for a timeout for N events, if N events
pass before the timeout expires, then the timeout essentially does
nothing. You'd still get a completion event, as with all SQEs, but it
would not timeout and it'd happen right after that Nth event.

The wait part always returns 0 if we have events, a potential error is
only returned if the CQ ring is empty. That's the same as what we have
now.

But sounds like we are in violent agreement. I'll post a new patch for
this soonish.

>>> I can easily imagine usecases where I'd want to submit a bunch of ios
>>> and wait for all of their completion to minimize unnecessary context
>>> switches, as all IOs are required to continue. But with a relatively
>>> small timeout, to allow switching to do other work etc.
>>
>> The question is if it's worth it to add support for "wait for these N
>> exact events", or whether "wait for N events" is enough. The application
>> needs to read those completions anyway, and could then decide to loop
>> if it's still missing some events. Downside is that it may mean more
>> calls to wait, but since the io_uring is rarely shared, it might be
>> just fine.
> 
> I think "wait for N events" is sufficient. I'm not even sure how one
> could safely use "wait for these N exact events", or what precisely it
> would mean.  All the usecases for min_complete that I can think of
> basically just want to avoid unnecessary userspace transitions if not
> enough work has been done to have a chance to finish its task - but if
> there's plenty results other than the just submitted ones in the queue
> that's also ok.

OK, that's exactly my thinking as well.

You could wait for specific events, but you'd have to tag the events
somehow to do that. I'd rather not add functionality like that unless
absolutely necessary, especially since this kind of functionality could
just be added to liburing if needed (or coded in the application
itself).

>> , but since the io_uring is rarely shared, it might be just fine.
> 
> FWIW, I think we might want to share it between (forked) processes in
> postgres, but I'm not sure yet (as in, in my current rough prototype I'm
> not yet doing so). Without that it's a lot harder to really benefit from
> the queue ordering operations, and sharing also allows to order queue
> flushes to later parts of the journal, making it more likely that
> connections COMMITing earlier also finish earlier.
> 
> Another, fairly crucial, reason is that being able to finish io requests
> started by other backends would make it far easier to avoid deadlock
> risk between postgres connections / background processes. Otherwise it's
> fairly easy to encounter situations where backend A issues a few
> prefetch requests and then blocks on some lock held by process B, and B
> needs the one of the prefetchted buffers from A to finish IO. There's
> more complex workarounds for this, but ...

Sharing is fine, as long as you mutually exclude reading the SQ and CQ
rings, of course. And if it makes it easier to do for queue ordering,
then by all means you should do that.
Jens Axboe Sept. 20, 2019, 11:10 p.m. UTC | #5
On 9/20/19 3:26 PM, Jens Axboe wrote:
> But sounds like we are in violent agreement. I'll post a new patch for
> this soonish.

How about this? You pass in number of events in sqe->off. If that amount
of events happen before the timer expires, then the timer is deleted and
the completion posted. The timeout cqe->res will be -ETIME if the timer
expired, and 0 if it got removed due to hitting the number of events.

Lightly tested, works for me.

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 05a299e80159..88d4584f12cd 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -200,6 +200,7 @@ struct io_ring_ctx {
 		struct io_uring_sqe	*sq_sqes;
 
 		struct list_head	defer_list;
+		struct list_head	timeout_list;
 	} ____cacheline_aligned_in_smp;
 
 	/* IO offload */
@@ -216,6 +217,7 @@ struct io_ring_ctx {
 		struct wait_queue_head	cq_wait;
 		struct fasync_struct	*cq_fasync;
 		struct eventfd_ctx	*cq_ev_fd;
+		atomic_t		cq_timeouts;
 	} ____cacheline_aligned_in_smp;
 
 	struct io_rings	*rings;
@@ -283,6 +285,12 @@ struct io_poll_iocb {
 	struct wait_queue_entry		wait;
 };
 
+struct io_timeout {
+	struct file			*file;
+	unsigned			count;
+	struct hrtimer			timer;
+};
+
 /*
  * NOTE! Each of the iocb union members has the file pointer
  * as the first entry in their struct definition. So you can
@@ -294,6 +302,7 @@ struct io_kiocb {
 		struct file		*file;
 		struct kiocb		rw;
 		struct io_poll_iocb	poll;
+		struct io_timeout	timeout;
 	};
 
 	struct sqe_submit	submit;
@@ -344,6 +353,8 @@ struct io_submit_state {
 };
 
 static void io_sq_wq_submit_work(struct work_struct *work);
+static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
+				 long res);
 static void __io_free_req(struct io_kiocb *req);
 
 static struct kmem_cache *req_cachep;
@@ -400,6 +411,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	INIT_LIST_HEAD(&ctx->poll_list);
 	INIT_LIST_HEAD(&ctx->cancel_list);
 	INIT_LIST_HEAD(&ctx->defer_list);
+	INIT_LIST_HEAD(&ctx->timeout_list);
 	return ctx;
 }
 
@@ -460,10 +472,40 @@ static inline void io_queue_async_work(struct io_ring_ctx *ctx,
 	queue_work(ctx->sqo_wq[rw], &req->work);
 }
 
+static void io_kill_timeout(struct io_kiocb *req)
+{
+	int ret;
+
+	ret = hrtimer_try_to_cancel(&req->timeout.timer);
+	if (ret != -1) {
+		list_del(&req->list);
+		io_cqring_fill_event(req->ctx, req->user_data, 0);
+		__io_free_req(req);
+	}
+}
+
+static void io_kill_timeouts(struct io_ring_ctx *ctx)
+{
+	struct io_kiocb *req, *tmp;
+
+	spin_lock_irq(&ctx->completion_lock);
+	list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
+		io_kill_timeout(req);
+	spin_unlock_irq(&ctx->completion_lock);
+}
+
 static void io_commit_cqring(struct io_ring_ctx *ctx)
 {
 	struct io_kiocb *req;
 
+	if (!list_empty(&ctx->timeout_list)) {
+		struct io_kiocb *tmp;
+
+		list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
+			if (!--req->timeout.count)
+				io_kill_timeout(req);
+	}
+
 	__io_commit_cqring(ctx);
 
 	while ((req = io_get_deferred_req(ctx)) != NULL) {
@@ -1765,6 +1807,60 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	return ipt.error;
 }
 
+static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
+{
+	struct io_ring_ctx *ctx;
+	struct io_kiocb *req;
+	unsigned long flags;
+
+	req = container_of(timer, struct io_kiocb, timeout.timer);
+	ctx = req->ctx;
+	atomic_inc(&ctx->cq_timeouts);
+
+	spin_lock_irqsave(&ctx->completion_lock, flags);
+	list_del(&req->list);
+
+	io_cqring_fill_event(ctx, req->user_data, -ETIME);
+	io_commit_cqring(ctx);
+	spin_unlock_irqrestore(&ctx->completion_lock, flags);
+
+	io_cqring_ev_posted(ctx);
+
+	io_put_req(req);
+	return HRTIMER_NORESTART;
+}
+
+static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+	struct io_ring_ctx *ctx = req->ctx;
+	struct timespec ts;
+
+	if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
+		return -EINVAL;
+	if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len != 1)
+		return -EINVAL;
+	if (copy_from_user(&ts, (void __user *) sqe->addr, sizeof(ts)))
+		return -EFAULT;
+
+	/*
+	 * sqe->off holds how many events that need to occur for this
+	 * timeout event to be satisfied.
+	 */
+	req->timeout.count = READ_ONCE(sqe->off);
+	if (!req->timeout.count)
+		req->timeout.count = 1;
+
+	spin_lock_irq(&ctx->completion_lock);
+	list_add_tail(&req->list, &ctx->timeout_list);
+	spin_unlock_irq(&ctx->completion_lock);
+
+	hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+	req->timeout.timer.function = io_timeout_fn;
+	hrtimer_start(&req->timeout.timer, timespec_to_ktime(ts),
+			HRTIMER_MODE_REL);
+	return 0;
+}
+
 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
 			const struct io_uring_sqe *sqe)
 {
@@ -1842,6 +1938,9 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	case IORING_OP_RECVMSG:
 		ret = io_recvmsg(req, s->sqe, force_nonblock);
 		break;
+	case IORING_OP_TIMEOUT:
+		ret = io_timeout(req, s->sqe);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
@@ -2599,6 +2698,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 			  const sigset_t __user *sig, size_t sigsz)
 {
 	struct io_rings *rings = ctx->rings;
+	unsigned nr_timeouts;
 	int ret;
 
 	if (io_cqring_events(rings) >= min_events)
@@ -2617,7 +2717,15 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 			return ret;
 	}
 
-	ret = wait_event_interruptible(ctx->wait, io_cqring_events(rings) >= min_events);
+	nr_timeouts = atomic_read(&ctx->cq_timeouts);
+	/*
+	 * Return if we have enough events, or if a timeout occured since
+	 * we started waiting. For timeouts, we always want to return to
+	 * userspace.
+	 */
+	ret = wait_event_interruptible(ctx->wait,
+				io_cqring_events(rings) >= min_events ||
+				atomic_read(&ctx->cq_timeouts) != nr_timeouts);
 	restore_saved_sigmask_unless(ret == -ERESTARTSYS);
 	if (ret == -ERESTARTSYS)
 		ret = -EINTR;
@@ -3288,6 +3396,7 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 	percpu_ref_kill(&ctx->refs);
 	mutex_unlock(&ctx->uring_lock);
 
+	io_kill_timeouts(ctx);
 	io_poll_remove_all(ctx);
 	io_iopoll_reap_events(ctx);
 	wait_for_completion(&ctx->ctx_done);
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 96ee9d94b73e..cf3101dc6b1e 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -61,6 +61,7 @@ struct io_uring_sqe {
 #define IORING_OP_SYNC_FILE_RANGE	8
 #define IORING_OP_SENDMSG	9
 #define IORING_OP_RECVMSG	10
+#define IORING_OP_TIMEOUT	11
 
 /*
  * sqe->fsync_flags
Jens Axboe Sept. 20, 2019, 11:52 p.m. UTC | #6
On 9/20/19 5:10 PM, Jens Axboe wrote:
> On 9/20/19 3:26 PM, Jens Axboe wrote:
>> But sounds like we are in violent agreement. I'll post a new patch for
>> this soonish.
> 
> How about this? You pass in number of events in sqe->off. If that amount
> of events happen before the timer expires, then the timer is deleted and
> the completion posted. The timeout cqe->res will be -ETIME if the timer
> expired, and 0 if it got removed due to hitting the number of events.
> 
> Lightly tested, works for me.

Found a missing increment case when I wrote up the test code. This
one passes the test code I put in the liburing 'timeout' branch.


diff --git a/fs/io_uring.c b/fs/io_uring.c
index 05a299e80159..3ae9489f6fc1 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -200,6 +200,7 @@ struct io_ring_ctx {
 		struct io_uring_sqe	*sq_sqes;
 
 		struct list_head	defer_list;
+		struct list_head	timeout_list;
 	} ____cacheline_aligned_in_smp;
 
 	/* IO offload */
@@ -216,6 +217,7 @@ struct io_ring_ctx {
 		struct wait_queue_head	cq_wait;
 		struct fasync_struct	*cq_fasync;
 		struct eventfd_ctx	*cq_ev_fd;
+		atomic_t		cq_timeouts;
 	} ____cacheline_aligned_in_smp;
 
 	struct io_rings	*rings;
@@ -283,6 +285,12 @@ struct io_poll_iocb {
 	struct wait_queue_entry		wait;
 };
 
+struct io_timeout {
+	struct file			*file;
+	unsigned			count;
+	struct hrtimer			timer;
+};
+
 /*
  * NOTE! Each of the iocb union members has the file pointer
  * as the first entry in their struct definition. So you can
@@ -294,6 +302,7 @@ struct io_kiocb {
 		struct file		*file;
 		struct kiocb		rw;
 		struct io_poll_iocb	poll;
+		struct io_timeout	timeout;
 	};
 
 	struct sqe_submit	submit;
@@ -344,6 +353,8 @@ struct io_submit_state {
 };
 
 static void io_sq_wq_submit_work(struct work_struct *work);
+static void io_cqring_fill_event(struct io_ring_ctx *ctx, u64 ki_user_data,
+				 long res);
 static void __io_free_req(struct io_kiocb *req);
 
 static struct kmem_cache *req_cachep;
@@ -400,6 +411,7 @@ static struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
 	INIT_LIST_HEAD(&ctx->poll_list);
 	INIT_LIST_HEAD(&ctx->cancel_list);
 	INIT_LIST_HEAD(&ctx->defer_list);
+	INIT_LIST_HEAD(&ctx->timeout_list);
 	return ctx;
 }
 
@@ -460,10 +472,41 @@ static inline void io_queue_async_work(struct io_ring_ctx *ctx,
 	queue_work(ctx->sqo_wq[rw], &req->work);
 }
 
+static void io_kill_timeout(struct io_kiocb *req)
+{
+	int ret;
+
+	ret = hrtimer_try_to_cancel(&req->timeout.timer);
+	if (ret != -1) {
+		atomic_inc(&req->ctx->cq_timeouts);
+		list_del(&req->list);
+		io_cqring_fill_event(req->ctx, req->user_data, 0);
+		__io_free_req(req);
+	}
+}
+
+static void io_kill_timeouts(struct io_ring_ctx *ctx)
+{
+	struct io_kiocb *req, *tmp;
+
+	spin_lock_irq(&ctx->completion_lock);
+	list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
+		io_kill_timeout(req);
+	spin_unlock_irq(&ctx->completion_lock);
+}
+
 static void io_commit_cqring(struct io_ring_ctx *ctx)
 {
 	struct io_kiocb *req;
 
+	if (!list_empty(&ctx->timeout_list)) {
+		struct io_kiocb *tmp;
+
+		list_for_each_entry_safe(req, tmp, &ctx->timeout_list, list)
+			if (!--req->timeout.count)
+				io_kill_timeout(req);
+	}
+
 	__io_commit_cqring(ctx);
 
 	while ((req = io_get_deferred_req(ctx)) != NULL) {
@@ -1765,6 +1808,60 @@ static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	return ipt.error;
 }
 
+static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
+{
+	struct io_ring_ctx *ctx;
+	struct io_kiocb *req;
+	unsigned long flags;
+
+	req = container_of(timer, struct io_kiocb, timeout.timer);
+	ctx = req->ctx;
+	atomic_inc(&ctx->cq_timeouts);
+
+	spin_lock_irqsave(&ctx->completion_lock, flags);
+	list_del(&req->list);
+
+	io_cqring_fill_event(ctx, req->user_data, -ETIME);
+	io_commit_cqring(ctx);
+	spin_unlock_irqrestore(&ctx->completion_lock, flags);
+
+	io_cqring_ev_posted(ctx);
+
+	io_put_req(req);
+	return HRTIMER_NORESTART;
+}
+
+static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+	struct io_ring_ctx *ctx = req->ctx;
+	struct timespec ts;
+
+	if (unlikely(ctx->flags & IORING_SETUP_IOPOLL))
+		return -EINVAL;
+	if (sqe->flags || sqe->ioprio || sqe->buf_index || sqe->len != 1)
+		return -EINVAL;
+	if (copy_from_user(&ts, (void __user *) sqe->addr, sizeof(ts)))
+		return -EFAULT;
+
+	/*
+	 * sqe->off holds how many events that need to occur for this
+	 * timeout event to be satisfied.
+	 */
+	req->timeout.count = READ_ONCE(sqe->off);
+	if (!req->timeout.count)
+		req->timeout.count = 1;
+
+	spin_lock_irq(&ctx->completion_lock);
+	list_add_tail(&req->list, &ctx->timeout_list);
+	spin_unlock_irq(&ctx->completion_lock);
+
+	hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+	req->timeout.timer.function = io_timeout_fn;
+	hrtimer_start(&req->timeout.timer, timespec_to_ktime(ts),
+			HRTIMER_MODE_REL);
+	return 0;
+}
+
 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
 			const struct io_uring_sqe *sqe)
 {
@@ -1842,6 +1939,9 @@ static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	case IORING_OP_RECVMSG:
 		ret = io_recvmsg(req, s->sqe, force_nonblock);
 		break;
+	case IORING_OP_TIMEOUT:
+		ret = io_timeout(req, s->sqe);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
@@ -2599,6 +2699,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 			  const sigset_t __user *sig, size_t sigsz)
 {
 	struct io_rings *rings = ctx->rings;
+	unsigned nr_timeouts;
 	int ret;
 
 	if (io_cqring_events(rings) >= min_events)
@@ -2617,7 +2718,15 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
 			return ret;
 	}
 
-	ret = wait_event_interruptible(ctx->wait, io_cqring_events(rings) >= min_events);
+	nr_timeouts = atomic_read(&ctx->cq_timeouts);
+	/*
+	 * Return if we have enough events, or if a timeout occured since
+	 * we started waiting. For timeouts, we always want to return to
+	 * userspace.
+	 */
+	ret = wait_event_interruptible(ctx->wait,
+				io_cqring_events(rings) >= min_events ||
+				atomic_read(&ctx->cq_timeouts) != nr_timeouts);
 	restore_saved_sigmask_unless(ret == -ERESTARTSYS);
 	if (ret == -ERESTARTSYS)
 		ret = -EINTR;
@@ -3288,6 +3397,7 @@ static void io_ring_ctx_wait_and_kill(struct io_ring_ctx *ctx)
 	percpu_ref_kill(&ctx->refs);
 	mutex_unlock(&ctx->uring_lock);
 
+	io_kill_timeouts(ctx);
 	io_poll_remove_all(ctx);
 	io_iopoll_reap_events(ctx);
 	wait_for_completion(&ctx->ctx_done);
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 96ee9d94b73e..cf3101dc6b1e 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -61,6 +61,7 @@ struct io_uring_sqe {
 #define IORING_OP_SYNC_FILE_RANGE	8
 #define IORING_OP_SENDMSG	9
 #define IORING_OP_RECVMSG	10
+#define IORING_OP_TIMEOUT	11
 
 /*
  * sqe->fsync_flags
diff mbox series

Patch

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 0dadbdbead0f..e169f5e8cd12 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -283,6 +283,10 @@  struct io_poll_iocb {
 	struct wait_queue_entry		wait;
 };
 
+struct io_timeout {
+	struct hrtimer timer;
+};
+
 /*
  * NOTE! Each of the iocb union members has the file pointer
  * as the first entry in their struct definition. So you can
@@ -294,6 +298,7 @@  struct io_kiocb {
 		struct file		*file;
 		struct kiocb		rw;
 		struct io_poll_iocb	poll;
+		struct io_timeout	timeout;
 	};
 
 	struct sqe_submit	submit;
@@ -1765,6 +1770,35 @@  static int io_poll_add(struct io_kiocb *req, const struct io_uring_sqe *sqe)
 	return ipt.error;
 }
 
+static enum hrtimer_restart io_timeout_fn(struct hrtimer *timer)
+{
+	struct io_kiocb *req;
+
+	req = container_of(timer, struct io_kiocb, timeout.timer);
+	io_cqring_add_event(req->ctx, req->user_data, 0);
+	io_put_req(req);
+	return HRTIMER_NORESTART;
+}
+
+static int io_timeout(struct io_kiocb *req, const struct io_uring_sqe *sqe)
+{
+	struct timespec ts;
+	ktime_t kt;
+
+	if (sqe->flags || sqe->ioprio || sqe->off || sqe->buf_index)
+		return -EINVAL;
+	if (sqe->len != 1)
+		return -EINVAL;
+	if (copy_from_user(&ts, (void __user *) sqe->addr, sizeof(ts)))
+		return -EFAULT;
+
+	hrtimer_init(&req->timeout.timer, CLOCK_MONOTONIC, HRTIMER_MODE_REL);
+	req->timeout.timer.function = io_timeout_fn;
+	kt = timespec_to_ktime(ts);
+	hrtimer_start(&req->timeout.timer, kt, HRTIMER_MODE_REL);
+	return 0;
+}
+
 static int io_req_defer(struct io_ring_ctx *ctx, struct io_kiocb *req,
 			const struct io_uring_sqe *sqe)
 {
@@ -1842,6 +1876,9 @@  static int __io_submit_sqe(struct io_ring_ctx *ctx, struct io_kiocb *req,
 	case IORING_OP_RECVMSG:
 		ret = io_recvmsg(req, s->sqe, force_nonblock);
 		break;
+	case IORING_OP_TIMEOUT:
+		ret = io_timeout(req, s->sqe);
+		break;
 	default:
 		ret = -EINVAL;
 		break;
diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h
index 96ee9d94b73e..cf3101dc6b1e 100644
--- a/include/uapi/linux/io_uring.h
+++ b/include/uapi/linux/io_uring.h
@@ -61,6 +61,7 @@  struct io_uring_sqe {
 #define IORING_OP_SYNC_FILE_RANGE	8
 #define IORING_OP_SENDMSG	9
 #define IORING_OP_RECVMSG	10
+#define IORING_OP_TIMEOUT	11
 
 /*
  * sqe->fsync_flags