Message ID | 20241122161645.494868-3-axboe@kernel.dk (mailing list archive) |
---|---|
State | New |
Headers | show |
Series | task work cleanups | expand |
On 11/22/24 16:12, Jens Axboe wrote: ... > static inline void io_req_local_work_add(struct io_kiocb *req, > struct io_ring_ctx *ctx, > - unsigned flags) > + unsigned tw_flags) > { > - unsigned nr_wait, nr_tw, nr_tw_prev; > - struct llist_node *head; > + unsigned nr_tw, nr_tw_prev, nr_wait; > + unsigned long flags; > > /* See comment above IO_CQ_WAKE_INIT */ > BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); > > /* > - * We don't know how many reuqests is there in the link and whether > - * they can even be queued lazily, fall back to non-lazy. > + * We don't know how many requests are in the link and whether they can > + * even be queued lazily, fall back to non-lazy. > */ > if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) > - flags &= ~IOU_F_TWQ_LAZY_WAKE; > + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE; > > - guard(rcu)(); protects against ctx->task deallocation, see a comment in io_ring_exit_work() -> synchronize_rcu() > + spin_lock_irqsave(&ctx->work_lock, flags); > + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list); > + nr_tw_prev = ctx->work_items++; Is there a good reason why it changes the semantics of what's stored across adds? It was assigning a corrected nr_tw, this one will start heavily spamming with wake_up() in some cases. > + spin_unlock_irqrestore(&ctx->work_lock, flags); > > - head = READ_ONCE(ctx->work_llist.first); > - do { > - nr_tw_prev = 0; > - if (head) { > - struct io_kiocb *first_req = container_of(head, > - struct io_kiocb, > - io_task_work.node); > - /* > - * Might be executed at any moment, rely on > - * SLAB_TYPESAFE_BY_RCU to keep it alive. > - */ > - nr_tw_prev = READ_ONCE(first_req->nr_tw); > - } > - > - /* > - * Theoretically, it can overflow, but that's fine as one of > - * previous adds should've tried to wake the task. > - */ > - nr_tw = nr_tw_prev + 1; > - if (!(flags & IOU_F_TWQ_LAZY_WAKE)) > - nr_tw = IO_CQ_WAKE_FORCE; > - > - req->nr_tw = nr_tw; > - req->io_task_work.node.next = head; > - } while (!try_cmpxchg(&ctx->work_llist.first, &head, > - &req->io_task_work.node)); > - > - /* > - * cmpxchg implies a full barrier, which pairs with the barrier > - * in set_current_state() on the io_cqring_wait() side. It's used > - * to ensure that either we see updated ->cq_wait_nr, or waiters > - * going to sleep will observe the work added to the list, which > - * is similar to the wait/wawke task state sync. > - */ > + nr_tw = nr_tw_prev + 1; > + if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE)) > + nr_tw = IO_CQ_WAKE_FORCE; > > - if (!head) { > + if (!nr_tw_prev) { > if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) > atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); > if (ctx->has_evfd) > io_eventfd_signal(ctx); > } > > + /* > + * We need a barrier after unlock, which pairs with the barrier > + * in set_current_state() on the io_cqring_wait() side. It's used > + * to ensure that either we see updated ->cq_wait_nr, or waiters > + * going to sleep will observe the work added to the list, which > + * is similar to the wait/wake task state sync. > + */ > + smp_mb(); > nr_wait = atomic_read(&ctx->cq_wait_nr); > /* not enough or no one is waiting */ > if (nr_tw < nr_wait) > @@ -1253,11 +1233,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx,
On 11/22/24 10:07 AM, Pavel Begunkov wrote: > On 11/22/24 16:12, Jens Axboe wrote: > ... >> static inline void io_req_local_work_add(struct io_kiocb *req, >> struct io_ring_ctx *ctx, >> - unsigned flags) >> + unsigned tw_flags) >> { >> - unsigned nr_wait, nr_tw, nr_tw_prev; >> - struct llist_node *head; >> + unsigned nr_tw, nr_tw_prev, nr_wait; >> + unsigned long flags; >> /* See comment above IO_CQ_WAKE_INIT */ >> BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); >> /* >> - * We don't know how many reuqests is there in the link and whether >> - * they can even be queued lazily, fall back to non-lazy. >> + * We don't know how many requests are in the link and whether they can >> + * even be queued lazily, fall back to non-lazy. >> */ >> if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) >> - flags &= ~IOU_F_TWQ_LAZY_WAKE; >> + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE; >> - guard(rcu)(); > > protects against ctx->task deallocation, see a comment in > io_ring_exit_work() -> synchronize_rcu() Yeah that's just an editing mistake. >> + spin_lock_irqsave(&ctx->work_lock, flags); >> + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list); >> + nr_tw_prev = ctx->work_items++; > > Is there a good reason why it changes the semantics of > what's stored across adds? It was assigning a corrected > nr_tw, this one will start heavily spamming with wake_up() > in some cases. Not sure I follow, how so? nr_tw_prev will be the previous count, just like before. Except we won't need to dig into the list to find it, we have it readily available. nr_tw will be the current code, or force wake if needed. As before.
On 11/22/24 17:11, Jens Axboe wrote: > On 11/22/24 10:07 AM, Pavel Begunkov wrote: >> On 11/22/24 16:12, Jens Axboe wrote: >> ... >>> static inline void io_req_local_work_add(struct io_kiocb *req, >>> struct io_ring_ctx *ctx, >>> - unsigned flags) >>> + unsigned tw_flags) >>> { >>> - unsigned nr_wait, nr_tw, nr_tw_prev; >>> - struct llist_node *head; >>> + unsigned nr_tw, nr_tw_prev, nr_wait; >>> + unsigned long flags; >>> /* See comment above IO_CQ_WAKE_INIT */ >>> BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); >>> /* >>> - * We don't know how many reuqests is there in the link and whether >>> - * they can even be queued lazily, fall back to non-lazy. >>> + * We don't know how many requests are in the link and whether they can >>> + * even be queued lazily, fall back to non-lazy. >>> */ >>> if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) >>> - flags &= ~IOU_F_TWQ_LAZY_WAKE; >>> + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE; >>> - guard(rcu)(); >> >> protects against ctx->task deallocation, see a comment in >> io_ring_exit_work() -> synchronize_rcu() > > Yeah that's just an editing mistake. > >>> + spin_lock_irqsave(&ctx->work_lock, flags); >>> + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list); >>> + nr_tw_prev = ctx->work_items++; >> >> Is there a good reason why it changes the semantics of >> what's stored across adds? It was assigning a corrected >> nr_tw, this one will start heavily spamming with wake_up() >> in some cases. > > Not sure I follow, how so? nr_tw_prev will be the previous count, just > like before. Except we won't need to dig into the list to find it, we > have it readily available. nr_tw will be the current code, or force wake > if needed. As before. The problem is what it stores, not how and where. Before req->nr_tw could've been set to IO_CQ_WAKE_FORCE, in which case following requests are not going to attempt waking up the task, now work_items is just a counter. Let's say you've got a bunch of non-lazy adds coming close to each other. The first sets IO_CQ_WAKE_FORCE and wakes the task, and others just queue themselves in the list. Now, every single one of them will try to wake_up() as long as ->cq_wait_nr is large enough.
On 11/22/24 10:25 AM, Pavel Begunkov wrote: > On 11/22/24 17:11, Jens Axboe wrote: >> On 11/22/24 10:07 AM, Pavel Begunkov wrote: >>> On 11/22/24 16:12, Jens Axboe wrote: >>> ... >>>> static inline void io_req_local_work_add(struct io_kiocb *req, >>>> struct io_ring_ctx *ctx, >>>> - unsigned flags) >>>> + unsigned tw_flags) >>>> { >>>> - unsigned nr_wait, nr_tw, nr_tw_prev; >>>> - struct llist_node *head; >>>> + unsigned nr_tw, nr_tw_prev, nr_wait; >>>> + unsigned long flags; >>>> /* See comment above IO_CQ_WAKE_INIT */ >>>> BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); >>>> /* >>>> - * We don't know how many reuqests is there in the link and whether >>>> - * they can even be queued lazily, fall back to non-lazy. >>>> + * We don't know how many requests are in the link and whether they can >>>> + * even be queued lazily, fall back to non-lazy. >>>> */ >>>> if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) >>>> - flags &= ~IOU_F_TWQ_LAZY_WAKE; >>>> + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE; >>>> - guard(rcu)(); >>> >>> protects against ctx->task deallocation, see a comment in >>> io_ring_exit_work() -> synchronize_rcu() >> >> Yeah that's just an editing mistake. >> >>>> + spin_lock_irqsave(&ctx->work_lock, flags); >>>> + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list); >>>> + nr_tw_prev = ctx->work_items++; >>> >>> Is there a good reason why it changes the semantics of >>> what's stored across adds? It was assigning a corrected >>> nr_tw, this one will start heavily spamming with wake_up() >>> in some cases. >> >> Not sure I follow, how so? nr_tw_prev will be the previous count, just >> like before. Except we won't need to dig into the list to find it, we >> have it readily available. nr_tw will be the current code, or force wake >> if needed. As before. > > The problem is what it stores, not how and where. Before req->nr_tw > could've been set to IO_CQ_WAKE_FORCE, in which case following > requests are not going to attempt waking up the task, now work_items > is just a counter. > > Let's say you've got a bunch of non-lazy adds coming close to each > other. The first sets IO_CQ_WAKE_FORCE and wakes the task, and > others just queue themselves in the list. Now, every single one > of them will try to wake_up() as long as ->cq_wait_nr is large > enough. If we really care about the non-lazy path as much, we can just use the same storing scheme as we did in req->nr_tw, except in ->work_items instead. Not a big deal imho.
diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 011860ade268..e9ba99cb0ed0 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -335,8 +335,9 @@ struct io_ring_ctx { * regularly bounce b/w CPUs. */ struct { - struct llist_head work_llist; - struct llist_head retry_llist; + struct io_wq_work_list work_list; + spinlock_t work_lock; + int work_items; unsigned long check_cq; atomic_t cq_wait_nr; atomic_t cq_timeouts; @@ -566,7 +567,11 @@ enum { typedef void (*io_req_tw_func_t)(struct io_kiocb *req, struct io_tw_state *ts); struct io_task_work { - struct llist_node node; + /* DEFER_TASKRUN uses work_node, regular task_work node */ + union { + struct io_wq_work_node work_node; + struct llist_node node; + }; io_req_tw_func_t func; }; @@ -622,8 +627,6 @@ struct io_kiocb { */ u16 buf_index; - unsigned nr_tw; - /* REQ_F_* flags */ io_req_flags_t flags; diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index c3a7d0197636..b7eb962e9872 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -339,7 +339,8 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p) INIT_LIST_HEAD(&ctx->defer_list); INIT_LIST_HEAD(&ctx->timeout_list); INIT_LIST_HEAD(&ctx->ltimeout_list); - init_llist_head(&ctx->work_llist); + INIT_WQ_LIST(&ctx->work_list); + spin_lock_init(&ctx->work_lock); INIT_LIST_HEAD(&ctx->tctx_list); ctx->submit_state.free_list.next = NULL; INIT_HLIST_HEAD(&ctx->waitid_list); @@ -1066,25 +1067,31 @@ struct llist_node *io_handle_tw_list(struct llist_node *node, return node; } -static __cold void __io_fallback_tw(struct llist_node *node, bool sync) +static __cold void __io_fallback_tw(struct io_kiocb *req, bool sync, + struct io_ring_ctx **last_ctx) { + if (sync && *last_ctx != req->ctx) { + if (*last_ctx) { + flush_delayed_work(&(*last_ctx)->fallback_work); + percpu_ref_put(&(*last_ctx)->refs); + } + *last_ctx = req->ctx; + percpu_ref_get(&(*last_ctx)->refs); + } + if (llist_add(&req->io_task_work.node, &req->ctx->fallback_llist)) + schedule_delayed_work(&req->ctx->fallback_work, 1); +} + +static void io_fallback_tw(struct io_uring_task *tctx, bool sync) +{ + struct llist_node *node = llist_del_all(&tctx->task_list); struct io_ring_ctx *last_ctx = NULL; struct io_kiocb *req; while (node) { req = container_of(node, struct io_kiocb, io_task_work.node); node = node->next; - if (sync && last_ctx != req->ctx) { - if (last_ctx) { - flush_delayed_work(&last_ctx->fallback_work); - percpu_ref_put(&last_ctx->refs); - } - last_ctx = req->ctx; - percpu_ref_get(&last_ctx->refs); - } - if (llist_add(&req->io_task_work.node, - &req->ctx->fallback_llist)) - schedule_delayed_work(&req->ctx->fallback_work, 1); + __io_fallback_tw(req, sync, &last_ctx); } if (last_ctx) { @@ -1093,13 +1100,6 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync) } } -static void io_fallback_tw(struct io_uring_task *tctx, bool sync) -{ - struct llist_node *node = llist_del_all(&tctx->task_list); - - __io_fallback_tw(node, sync); -} - struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count) @@ -1139,65 +1139,45 @@ void tctx_task_work(struct callback_head *cb) static inline void io_req_local_work_add(struct io_kiocb *req, struct io_ring_ctx *ctx, - unsigned flags) + unsigned tw_flags) { - unsigned nr_wait, nr_tw, nr_tw_prev; - struct llist_node *head; + unsigned nr_tw, nr_tw_prev, nr_wait; + unsigned long flags; /* See comment above IO_CQ_WAKE_INIT */ BUILD_BUG_ON(IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES); /* - * We don't know how many reuqests is there in the link and whether - * they can even be queued lazily, fall back to non-lazy. + * We don't know how many requests are in the link and whether they can + * even be queued lazily, fall back to non-lazy. */ if (req->flags & (REQ_F_LINK | REQ_F_HARDLINK)) - flags &= ~IOU_F_TWQ_LAZY_WAKE; + tw_flags &= ~IOU_F_TWQ_LAZY_WAKE; - guard(rcu)(); + spin_lock_irqsave(&ctx->work_lock, flags); + wq_list_add_tail(&req->io_task_work.work_node, &ctx->work_list); + nr_tw_prev = ctx->work_items++; + spin_unlock_irqrestore(&ctx->work_lock, flags); - head = READ_ONCE(ctx->work_llist.first); - do { - nr_tw_prev = 0; - if (head) { - struct io_kiocb *first_req = container_of(head, - struct io_kiocb, - io_task_work.node); - /* - * Might be executed at any moment, rely on - * SLAB_TYPESAFE_BY_RCU to keep it alive. - */ - nr_tw_prev = READ_ONCE(first_req->nr_tw); - } - - /* - * Theoretically, it can overflow, but that's fine as one of - * previous adds should've tried to wake the task. - */ - nr_tw = nr_tw_prev + 1; - if (!(flags & IOU_F_TWQ_LAZY_WAKE)) - nr_tw = IO_CQ_WAKE_FORCE; - - req->nr_tw = nr_tw; - req->io_task_work.node.next = head; - } while (!try_cmpxchg(&ctx->work_llist.first, &head, - &req->io_task_work.node)); - - /* - * cmpxchg implies a full barrier, which pairs with the barrier - * in set_current_state() on the io_cqring_wait() side. It's used - * to ensure that either we see updated ->cq_wait_nr, or waiters - * going to sleep will observe the work added to the list, which - * is similar to the wait/wawke task state sync. - */ + nr_tw = nr_tw_prev + 1; + if (!(tw_flags & IOU_F_TWQ_LAZY_WAKE)) + nr_tw = IO_CQ_WAKE_FORCE; - if (!head) { + if (!nr_tw_prev) { if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); if (ctx->has_evfd) io_eventfd_signal(ctx); } + /* + * We need a barrier after unlock, which pairs with the barrier + * in set_current_state() on the io_cqring_wait() side. It's used + * to ensure that either we see updated ->cq_wait_nr, or waiters + * going to sleep will observe the work added to the list, which + * is similar to the wait/wake task state sync. + */ + smp_mb(); nr_wait = atomic_read(&ctx->cq_wait_nr); /* not enough or no one is waiting */ if (nr_tw < nr_wait) @@ -1253,11 +1233,27 @@ void io_req_task_work_add_remote(struct io_kiocb *req, struct io_ring_ctx *ctx, static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx) { - struct llist_node *node = llist_del_all(&ctx->work_llist); + struct io_ring_ctx *last_ctx = NULL; + struct io_wq_work_node *node; + unsigned long flags; - __io_fallback_tw(node, false); - node = llist_del_all(&ctx->retry_llist); - __io_fallback_tw(node, false); + spin_lock_irqsave(&ctx->work_lock, flags); + node = ctx->work_list.first; + INIT_WQ_LIST(&ctx->work_list); + ctx->work_items = 0; + spin_unlock_irqrestore(&ctx->work_lock, flags); + + while (node) { + struct io_kiocb *req; + + req = container_of(node, struct io_kiocb, io_task_work.work_node); + node = node->next; + __io_fallback_tw(req, false, &last_ctx); + } + if (last_ctx) { + flush_delayed_work(&last_ctx->fallback_work); + percpu_ref_put(&last_ctx->refs); + } } static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, @@ -1272,52 +1268,52 @@ static bool io_run_local_work_continue(struct io_ring_ctx *ctx, int events, return false; } -static int __io_run_local_work_loop(struct llist_node **node, - struct io_tw_state *ts, - int events) -{ - while (*node) { - struct llist_node *next = (*node)->next; - struct io_kiocb *req = container_of(*node, struct io_kiocb, - io_task_work.node); - INDIRECT_CALL_2(req->io_task_work.func, - io_poll_task_func, io_req_rw_complete, - req, ts); - *node = next; - if (--events <= 0) - break; - } - - return events; -} - static int __io_run_local_work(struct io_ring_ctx *ctx, struct io_tw_state *ts, int min_events) { - struct llist_node *node; + struct io_wq_work_node *node, *tail; + int ret, limit, nitems; unsigned int loops = 0; - int ret, limit; if (WARN_ON_ONCE(ctx->submitter_task != current)) return -EEXIST; if (ctx->flags & IORING_SETUP_TASKRUN_FLAG) atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags); + ret = 0; limit = max(IO_LOCAL_TW_DEFAULT_MAX, min_events); again: - ret = __io_run_local_work_loop(&ctx->retry_llist.first, ts, limit); - if (ctx->retry_llist.first) + spin_lock_irq(&ctx->work_lock); + node = ctx->work_list.first; + tail = ctx->work_list.last; + nitems = ctx->work_items; + INIT_WQ_LIST(&ctx->work_list); + ctx->work_items = 0; + spin_unlock_irq(&ctx->work_lock); + + while (node) { + struct io_kiocb *req = container_of(node, struct io_kiocb, + io_task_work.work_node); + node = node->next; + INDIRECT_CALL_2(req->io_task_work.func, + io_poll_task_func, io_req_rw_complete, + req, ts); + nitems--; + if (++ret >= limit) + break; + } + + if (unlikely(node)) { + spin_lock_irq(&ctx->work_lock); + tail->next = ctx->work_list.first; + ctx->work_list.first = node; + if (!ctx->work_list.last) + ctx->work_list.last = tail; + ctx->work_items += nitems; + spin_unlock_irq(&ctx->work_lock); goto retry_done; + } - /* - * llists are in reverse order, flip it back the right way before - * running the pending items. - */ - node = llist_reverse_order(llist_del_all(&ctx->work_llist)); - ret = __io_run_local_work_loop(&node, ts, ret); - ctx->retry_llist.first = node; loops++; - - ret = limit - ret; if (io_run_local_work_continue(ctx, ret, min_events)) goto again; retry_done: @@ -2413,7 +2409,7 @@ static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer) if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) { atomic_set(&ctx->cq_wait_nr, 1); smp_mb(); - if (!llist_empty(&ctx->work_llist)) + if (io_local_work_pending(ctx)) goto out_wake; } diff --git a/io_uring/io_uring.h b/io_uring/io_uring.h index 214f9f175102..2fae27803116 100644 --- a/io_uring/io_uring.h +++ b/io_uring/io_uring.h @@ -349,7 +349,7 @@ static inline int io_run_task_work(void) static inline bool io_local_work_pending(struct io_ring_ctx *ctx) { - return !llist_empty(&ctx->work_llist) || !llist_empty(&ctx->retry_llist); + return READ_ONCE(ctx->work_list.first); } static inline bool io_task_work_pending(struct io_ring_ctx *ctx)
Add a spinlock for the list, and replace the lockless llist with the work list instead. This avoids needing to reverse items in the list before running them, as the io_wq_work_list is FIFO by nature whereas the llist is LIFO. Signed-off-by: Jens Axboe <axboe@kernel.dk> --- include/linux/io_uring_types.h | 13 ++- io_uring/io_uring.c | 194 ++++++++++++++++----------------- io_uring/io_uring.h | 2 +- 3 files changed, 104 insertions(+), 105 deletions(-)