@@ -330,11 +330,16 @@ struct io_submit_state {
struct io_tx_notifier {
struct ubuf_info uarg;
- struct work_struct commit_work;
struct percpu_ref *fixed_rsrc_refs;
u64 tag;
u32 seq;
struct list_head cache_node;
+ struct task_struct *task;
+
+ union {
+ struct callback_head task_work;
+ struct work_struct commit_work;
+ };
};
struct io_tx_ctx {
@@ -1965,19 +1970,17 @@ static noinline bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data,
return __io_fill_cqe(ctx, user_data, res, cflags);
}
-static void io_zc_tx_work_callback(struct work_struct *work)
+static void io_zc_tx_notifier_finish(struct callback_head *cb)
{
- struct io_tx_notifier *notifier = container_of(work, struct io_tx_notifier,
- commit_work);
+ struct io_tx_notifier *notifier = container_of(cb, struct io_tx_notifier,
+ task_work);
struct io_ring_ctx *ctx = notifier->uarg.ctx;
struct percpu_ref *rsrc_refs = notifier->fixed_rsrc_refs;
spin_lock(&ctx->completion_lock);
io_fill_cqe_aux(ctx, notifier->tag, notifier->seq, 0);
-
list_add(¬ifier->cache_node, &ctx->ubuf_list_locked);
ctx->ubuf_locked_nr++;
-
io_commit_cqring(ctx);
spin_unlock(&ctx->completion_lock);
io_cqring_ev_posted(ctx);
@@ -1985,6 +1988,14 @@ static void io_zc_tx_work_callback(struct work_struct *work)
percpu_ref_put(rsrc_refs);
}
+static void io_zc_tx_work_callback(struct work_struct *work)
+{
+ struct io_tx_notifier *notifier = container_of(work, struct io_tx_notifier,
+ commit_work);
+
+ io_zc_tx_notifier_finish(¬ifier->task_work);
+}
+
static void io_uring_tx_zerocopy_callback(struct sk_buff *skb,
struct ubuf_info *uarg,
bool success)
@@ -1994,21 +2005,39 @@ static void io_uring_tx_zerocopy_callback(struct sk_buff *skb,
if (!refcount_dec_and_test(&uarg->refcnt))
return;
+ if (unlikely(!notifier->task))
+ goto fallback;
- if (in_interrupt()) {
- INIT_WORK(¬ifier->commit_work, io_zc_tx_work_callback);
- queue_work(system_unbound_wq, ¬ifier->commit_work);
- } else {
- io_zc_tx_work_callback(¬ifier->commit_work);
+ put_task_struct(notifier->task);
+ notifier->task = NULL;
+
+ if (!in_interrupt()) {
+ io_zc_tx_notifier_finish(¬ifier->task_work);
+ return;
}
+
+ init_task_work(¬ifier->task_work, io_zc_tx_notifier_finish);
+ if (likely(!task_work_add(notifier->task, ¬ifier->task_work,
+ TWA_SIGNAL)))
+ return;
+
+fallback:
+ INIT_WORK(¬ifier->commit_work, io_zc_tx_work_callback);
+ queue_work(system_unbound_wq, ¬ifier->commit_work);
}
-static void io_tx_kill_notification(struct io_tx_ctx *tx_ctx)
+static inline void __io_tx_kill_notification(struct io_tx_ctx *tx_ctx)
{
io_uring_tx_zerocopy_callback(NULL, &tx_ctx->notifier->uarg, true);
tx_ctx->notifier = NULL;
}
+static inline void io_tx_kill_notification(struct io_tx_ctx *tx_ctx)
+{
+ tx_ctx->notifier->task = get_task_struct(current);
+ __io_tx_kill_notification(tx_ctx);
+}
+
static void io_notifier_splice(struct io_ring_ctx *ctx)
{
spin_lock(&ctx->completion_lock);
@@ -2058,7 +2087,7 @@ static struct io_tx_notifier *io_alloc_tx_notifier(struct io_ring_ctx *ctx,
} else {
gfp_t gfp_flags = GFP_ATOMIC|GFP_KERNEL_ACCOUNT;
- notifier = kmalloc(sizeof(*notifier), gfp_flags);
+ notifier = kzalloc(sizeof(*notifier), gfp_flags);
if (!notifier)
return NULL;
ctx->nr_tx_ctx++;
@@ -9502,7 +9531,7 @@ static void io_sqe_tx_ctx_kill_ubufs(struct io_ring_ctx *ctx)
tx_ctx = &ctx->tx_ctxs[i];
if (tx_ctx->notifier)
- io_tx_kill_notification(tx_ctx);
+ __io_tx_kill_notification(tx_ctx);
}
}
workqueues are way too heavy for tx notification delivery. We still need some non-irq context because ->completion_lock is not irq-safe, so use task_work instead. Expectedly, performance for test cases with real hardware and juggling lots of notifications the perfomance is drastically better, e.g. profiles percetage of relevant parts drops from 30% to less than 3% Signed-off-by: Pavel Begunkov <asml.silence@gmail.com> --- fs/io_uring.c | 57 ++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 14 deletions(-)