@@ -104,11 +104,17 @@ struct async_list {
size_t io_pages;
};
+#define INLINE_REQS_TOTAL 128
+
struct io_ring_ctx {
struct {
struct percpu_ref refs;
} ____cacheline_aligned_in_smp;
+ struct io_kiocb *inline_req_array;
+ struct list_head inline_req_list;
+ unsigned int inline_available_reqs;
+
struct {
unsigned int flags;
bool compat;
@@ -226,9 +232,9 @@ struct io_kiocb {
#define REQ_F_FIXED_FILE 4 /* ctx owns file */
#define REQ_F_SEQ_PREV 8 /* sequential with previous */
#define REQ_F_PREPPED 16 /* prep already done */
+#define REQ_F_INLINE 32 /* ctx inline req */
u64 user_data;
u64 error;
-
struct work_struct work;
};
@@ -396,6 +402,23 @@ static void io_ring_drop_ctx_refs(struct io_ring_ctx *ctx, unsigned refs)
wake_up(&ctx->wait);
}
+static struct io_kiocb *io_get_inline_req(struct io_ring_ctx *ctx)
+{
+ struct io_kiocb *req;
+
+ req = list_first_entry(&ctx->inline_req_list, struct io_kiocb, list);
+ list_del_init(&req->list);
+ ctx->inline_available_reqs--;
+ return req;
+}
+
+static void io_free_inline_req(struct io_ring_ctx *ctx, struct io_kiocb *req)
+{
+ INIT_LIST_HEAD(&req->list);
+ list_add(&req->list, &ctx->inline_req_list);
+ ctx->inline_available_reqs++;
+}
+
static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
struct io_submit_state *state)
{
@@ -405,10 +428,14 @@ static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
if (!percpu_ref_tryget(&ctx->refs))
return NULL;
- if (!state) {
+ if (ctx->inline_available_reqs) {
+ req = io_get_inline_req(ctx);
+ req->flags = REQ_F_INLINE;
+ } else if (!state) {
req = kmem_cache_alloc(req_cachep, gfp);
if (unlikely(!req))
goto out;
+ req->flags = 0;
} else if (!state->free_reqs) {
size_t sz;
int ret;
@@ -429,14 +456,15 @@ static struct io_kiocb *io_get_req(struct io_ring_ctx *ctx,
state->free_reqs = ret - 1;
state->cur_req = 1;
req = state->reqs[0];
+ req->flags = 0;
} else {
req = state->reqs[state->cur_req];
state->free_reqs--;
state->cur_req++;
+ req->flags = 0;
}
req->ctx = ctx;
- req->flags = 0;
/* one is dropped after submission, the other at completion */
refcount_set(&req->refs, 2);
return req;
@@ -456,10 +484,15 @@ static void io_free_req_many(struct io_ring_ctx *ctx, void **reqs, int *nr)
static void io_free_req(struct io_kiocb *req)
{
+ struct io_ring_ctx *ctx = req->ctx;
+
if (req->file && !(req->flags & REQ_F_FIXED_FILE))
fput(req->file);
io_ring_drop_ctx_refs(req->ctx, 1);
- kmem_cache_free(req_cachep, req);
+ if (req->flags & REQ_F_INLINE)
+ io_free_inline_req(ctx, req);
+ else
+ kmem_cache_free(req_cachep, req);
}
static void io_put_req(struct io_kiocb *req)
@@ -492,7 +525,7 @@ static void io_iopoll_complete(struct io_ring_ctx *ctx, unsigned int *nr_events,
* completions for those, only batch free for fixed
* file.
*/
- if (req->flags & REQ_F_FIXED_FILE) {
+ if (!(req->flags & REQ_F_INLINE) && req->flags & REQ_F_FIXED_FILE) {
reqs[to_free++] = req;
if (to_free == ARRAY_SIZE(reqs))
io_free_req_many(ctx, reqs, &to_free);
@@ -1593,6 +1626,45 @@ static int io_req_set_file(struct io_ring_ctx *ctx, const struct sqe_submit *s,
return 0;
}
+static int io_punt_req_to_work(struct io_ring_ctx *ctx, struct sqe_submit *s,
+ struct io_kiocb *req)
+{
+ struct io_uring_sqe *sqe_copy;
+ struct io_kiocb *req_copy;
+ struct async_list *list;
+
+ sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
+ if (!sqe_copy)
+ return -ENOMEM;
+
+ if (req->flags & REQ_F_INLINE) {
+ req_copy = kmem_cache_alloc(req_cachep, GFP_KERNEL);
+ if (!req_copy) {
+ kfree(sqe_copy);
+ return -ENOMEM;
+ }
+
+ memcpy(req_copy, req, sizeof(*req));
+ req_copy->flags &= ~REQ_F_INLINE;
+ io_free_inline_req(ctx, req);
+ req = req_copy;
+ }
+
+ memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
+ s->sqe = sqe_copy;
+
+ memcpy(&req->submit, s, sizeof(*s));
+ list = io_async_list_from_sqe(ctx, s->sqe);
+ if (!io_add_to_prev_work(list, req)) {
+ if (list)
+ atomic_inc(&list->cnt);
+ INIT_WORK(&req->work, io_sq_wq_submit_work);
+ queue_work(ctx->sqo_wq, &req->work);
+ }
+
+ return 0;
+}
+
static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
struct io_submit_state *state)
{
@@ -1613,31 +1685,13 @@ static int io_submit_sqe(struct io_ring_ctx *ctx, struct sqe_submit *s,
ret = __io_submit_sqe(ctx, req, s, true, state);
if (ret == -EAGAIN) {
- struct io_uring_sqe *sqe_copy;
-
- sqe_copy = kmalloc(sizeof(*sqe_copy), GFP_KERNEL);
- if (sqe_copy) {
- struct async_list *list;
-
- memcpy(sqe_copy, s->sqe, sizeof(*sqe_copy));
- s->sqe = sqe_copy;
-
- memcpy(&req->submit, s, sizeof(*s));
- list = io_async_list_from_sqe(ctx, s->sqe);
- if (!io_add_to_prev_work(list, req)) {
- if (list)
- atomic_inc(&list->cnt);
- INIT_WORK(&req->work, io_sq_wq_submit_work);
- queue_work(ctx->sqo_wq, &req->work);
- }
-
- /*
- * Queued up for async execution, worker will release
- * submit reference when the iocb is actually
- * submitted.
- */
+ /*
+ * Queued up for async execution, worker will release
+ * submit reference when the iocb is actually
+ * submitted.
+ */
+ if (!io_punt_req_to_work(ctx, s, req))
return 0;
- }
}
out:
@@ -2520,6 +2574,9 @@ static void io_ring_ctx_free(struct io_ring_ctx *ctx)
sock_release(ctx->ring_sock);
#endif
+ if (ctx->inline_req_array)
+ kfree(ctx->inline_req_array);
+
io_mem_free(ctx->sq_ring);
io_mem_free(ctx->sq_sqes);
io_mem_free(ctx->cq_ring);
@@ -2783,7 +2840,7 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p)
struct user_struct *user = NULL;
struct io_ring_ctx *ctx;
bool account_mem;
- int ret;
+ int ret, i;
if (!entries || entries > IORING_MAX_ENTRIES)
return -EINVAL;
@@ -2817,6 +2874,31 @@ static int io_uring_create(unsigned entries, struct io_uring_params *p)
free_uid(user);
return -ENOMEM;
}
+
+ /*
+ * When IORING_SETUP_IOPOLL and direct_io, all of submit and
+ * completion are handled under ctx->uring_lock or in SQ poll
+ * thread context, so io_get_req and io_put_req are serialized
+ * well. we could update inline_reqs_h and inline_reqs_t w/o any
+ * lock and benefit from the inline reqs.
+ */
+ ctx->inline_available_reqs = 0;
+ if (ctx->flags & IORING_SETUP_IOPOLL) {
+ ctx->inline_req_array = kmalloc(
+ sizeof(struct io_kiocb) * INLINE_REQS_TOTAL,
+ GFP_KERNEL);
+ if (ctx->inline_req_array) {
+ struct io_kiocb *req;
+ INIT_LIST_HEAD(&ctx->inline_req_list);
+ for (i = 0; i < INLINE_REQS_TOTAL; i++) {
+ req = ctx->inline_req_array + i;
+ INIT_LIST_HEAD(&req->list);
+ list_add_tail(&req->list, &ctx->inline_req_list);
+ }
+ ctx->inline_available_reqs = INLINE_REQS_TOTAL;
+ }
+ }
+
ctx->compat = in_compat_syscall();
ctx->account_mem = account_mem;
ctx->user = user;
For the IORING_SETUP_IOPOLL case, all of the submission and completion are handled under ctx->uring_lock or in SQ poll thread context, so io_get_req and io_put_req has been serialized well. The only exception is the asynchronous workqueue context where could free the io_kiocb for error. To overcome this, we allocate a new io_kiocb and free the previous inlined one. Based on this, we introduce the preallocated reqs list per ctx and needn't to provide any lock to serialize the updating of list. The performacne benefits from this. The test result of following fio command fio --name=io_uring_test --ioengine=io_uring --hipri --fixedbufs --iodepth=16 --direct=1 --numjobs=1 --filename=/dev/nvme0n1 --bs=4k --group_reporting --runtime=10 shows IOPS upgrade from 197K to 206K. Signed-off-by: Jianchao Wang <jianchao.w.wang@oracle.com> --- V2 -> V1: - use list to maintian the preallocated io_kiocb - allocate a new io_kiocb when punt io to workqueue context fs/io_uring.c | 142 +++++++++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 112 insertions(+), 30 deletions(-)