diff mbox series

[RFC,v2,18/19] io_uring: task_work for notification delivery

Message ID 33b943a2409dc1c4ad845ea0bebb76ecad723ef6.1640029579.git.asml.silence@gmail.com (mailing list archive)
State RFC
Headers show
Series io_uring zerocopy tx | expand

Checks

Context Check Description
netdev/tree_selection success Guessing tree name failed - patch did not apply, async

Commit Message

Pavel Begunkov Dec. 21, 2021, 3:35 p.m. UTC
workqueues are way too heavy for tx notification delivery. We still
need some non-irq context because ->completion_lock is not irq-safe, so
use task_work instead. Expectedly, performance for test cases with real
hardware and juggling lots of notifications the perfomance is
drastically better, e.g. profiles percetage of relevant parts drops
from 30% to less than 3%

Signed-off-by: Pavel Begunkov <asml.silence@gmail.com>
---
 fs/io_uring.c | 57 ++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 43 insertions(+), 14 deletions(-)
diff mbox series

Patch

diff --git a/fs/io_uring.c b/fs/io_uring.c
index 8cfa8ea161e4..ee496b463462 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -330,11 +330,16 @@  struct io_submit_state {
 
 struct io_tx_notifier {
 	struct ubuf_info	uarg;
-	struct work_struct	commit_work;
 	struct percpu_ref	*fixed_rsrc_refs;
 	u64			tag;
 	u32			seq;
 	struct list_head	cache_node;
+	struct task_struct	*task;
+
+	union {
+		struct callback_head	task_work;
+		struct work_struct	commit_work;
+	};
 };
 
 struct io_tx_ctx {
@@ -1965,19 +1970,17 @@  static noinline bool io_fill_cqe_aux(struct io_ring_ctx *ctx, u64 user_data,
 	return __io_fill_cqe(ctx, user_data, res, cflags);
 }
 
-static void io_zc_tx_work_callback(struct work_struct *work)
+static void io_zc_tx_notifier_finish(struct callback_head *cb)
 {
-	struct io_tx_notifier *notifier = container_of(work, struct io_tx_notifier,
-						       commit_work);
+	struct io_tx_notifier *notifier = container_of(cb, struct io_tx_notifier,
+						       task_work);
 	struct io_ring_ctx *ctx = notifier->uarg.ctx;
 	struct percpu_ref *rsrc_refs = notifier->fixed_rsrc_refs;
 
 	spin_lock(&ctx->completion_lock);
 	io_fill_cqe_aux(ctx, notifier->tag, notifier->seq, 0);
-
 	list_add(&notifier->cache_node, &ctx->ubuf_list_locked);
 	ctx->ubuf_locked_nr++;
-
 	io_commit_cqring(ctx);
 	spin_unlock(&ctx->completion_lock);
 	io_cqring_ev_posted(ctx);
@@ -1985,6 +1988,14 @@  static void io_zc_tx_work_callback(struct work_struct *work)
 	percpu_ref_put(rsrc_refs);
 }
 
+static void io_zc_tx_work_callback(struct work_struct *work)
+{
+	struct io_tx_notifier *notifier = container_of(work, struct io_tx_notifier,
+						       commit_work);
+
+	io_zc_tx_notifier_finish(&notifier->task_work);
+}
+
 static void io_uring_tx_zerocopy_callback(struct sk_buff *skb,
 					  struct ubuf_info *uarg,
 					  bool success)
@@ -1994,21 +2005,39 @@  static void io_uring_tx_zerocopy_callback(struct sk_buff *skb,
 
 	if (!refcount_dec_and_test(&uarg->refcnt))
 		return;
+	if (unlikely(!notifier->task))
+		goto fallback;
 
-	if (in_interrupt()) {
-		INIT_WORK(&notifier->commit_work, io_zc_tx_work_callback);
-		queue_work(system_unbound_wq, &notifier->commit_work);
-	} else {
-		io_zc_tx_work_callback(&notifier->commit_work);
+	put_task_struct(notifier->task);
+	notifier->task = NULL;
+
+	if (!in_interrupt()) {
+		io_zc_tx_notifier_finish(&notifier->task_work);
+		return;
 	}
+
+	init_task_work(&notifier->task_work, io_zc_tx_notifier_finish);
+	if (likely(!task_work_add(notifier->task, &notifier->task_work,
+				  TWA_SIGNAL)))
+		return;
+
+fallback:
+	INIT_WORK(&notifier->commit_work, io_zc_tx_work_callback);
+	queue_work(system_unbound_wq, &notifier->commit_work);
 }
 
-static void io_tx_kill_notification(struct io_tx_ctx *tx_ctx)
+static inline void __io_tx_kill_notification(struct io_tx_ctx *tx_ctx)
 {
 	io_uring_tx_zerocopy_callback(NULL, &tx_ctx->notifier->uarg, true);
 	tx_ctx->notifier = NULL;
 }
 
+static inline void io_tx_kill_notification(struct io_tx_ctx *tx_ctx)
+{
+	tx_ctx->notifier->task = get_task_struct(current);
+	__io_tx_kill_notification(tx_ctx);
+}
+
 static void io_notifier_splice(struct io_ring_ctx *ctx)
 {
 	spin_lock(&ctx->completion_lock);
@@ -2058,7 +2087,7 @@  static struct io_tx_notifier *io_alloc_tx_notifier(struct io_ring_ctx *ctx,
 	} else {
 		gfp_t gfp_flags = GFP_ATOMIC|GFP_KERNEL_ACCOUNT;
 
-		notifier = kmalloc(sizeof(*notifier), gfp_flags);
+		notifier = kzalloc(sizeof(*notifier), gfp_flags);
 		if (!notifier)
 			return NULL;
 		ctx->nr_tx_ctx++;
@@ -9502,7 +9531,7 @@  static void io_sqe_tx_ctx_kill_ubufs(struct io_ring_ctx *ctx)
 		tx_ctx = &ctx->tx_ctxs[i];
 
 		if (tx_ctx->notifier)
-			io_tx_kill_notification(tx_ctx);
+			__io_tx_kill_notification(tx_ctx);
 	}
 }