@@ -240,6 +240,21 @@ struct aio_kiocb {
};
};
+struct aio_submit_state {
+ struct kioctx *ctx;
+
+ struct blk_plug plug;
+#ifdef CONFIG_BLOCK
+ struct blk_plug_cb plug_cb;
+#endif
+
+ /*
+ * Polled iocbs that have been submitted, but not added to the ctx yet
+ */
+ struct list_head req_list;
+ unsigned int req_count;
+};
+
/*------ sysctl variables----*/
static DEFINE_SPINLOCK(aio_nr_lock);
unsigned long aio_nr; /* current system wide number of aio requests */
@@ -257,6 +272,15 @@ static const struct address_space_operations aio_ctx_aops;
static const unsigned int iocb_page_shift =
ilog2(PAGE_SIZE / sizeof(struct iocb));
+/*
+ * We rely on block level unplugs to flush pending requests, if we schedule
+ */
+#ifdef CONFIG_BLOCK
+static const bool aio_use_state_req_list = true;
+#else
+static const bool aio_use_state_req_list = false;
+#endif
+
static void aio_useriocb_unmap(struct kioctx *);
static void aio_iopoll_reap_events(struct kioctx *);
@@ -1901,13 +1925,28 @@ static inline void aio_rw_done(struct kiocb *req, ssize_t ret)
}
}
+/*
+ * Called either at the end of IO submission, or through a plug callback
+ * because we're going to schedule. Moves out local batch of requests to
+ * the ctx poll list, so they can be found for polling + reaping.
+ */
+static void aio_flush_state_reqs(struct kioctx *ctx,
+ struct aio_submit_state *state)
+{
+ spin_lock(&ctx->poll_lock);
+ list_splice_tail_init(&state->req_list, &ctx->poll_submitted);
+ spin_unlock(&ctx->poll_lock);
+ state->req_count = 0;
+}
+
/*
* After the iocb has been issued, it's safe to be found on the poll list.
* Adding the kiocb to the list AFTER submission ensures that we don't
* find it from a io_getevents() thread before the issuer is done accessing
* the kiocb cookie.
*/
-static void aio_iopoll_iocb_issued(struct aio_kiocb *kiocb)
+static void aio_iopoll_iocb_issued(struct aio_submit_state *state,
+ struct aio_kiocb *kiocb)
{
/*
* For fast devices, IO may have already completed. If it has, add
@@ -1917,12 +1956,21 @@ static void aio_iopoll_iocb_issued(struct aio_kiocb *kiocb)
const int front = test_bit(KIOCB_F_POLL_COMPLETED, &kiocb->ki_flags);
struct kioctx *ctx = kiocb->ki_ctx;
- spin_lock(&ctx->poll_lock);
- if (front)
- list_add(&kiocb->ki_list, &ctx->poll_submitted);
- else
- list_add_tail(&kiocb->ki_list, &ctx->poll_submitted);
- spin_unlock(&ctx->poll_lock);
+ if (!state || !aio_use_state_req_list) {
+ spin_lock(&ctx->poll_lock);
+ if (front)
+ list_add(&kiocb->ki_list, &ctx->poll_submitted);
+ else
+ list_add_tail(&kiocb->ki_list, &ctx->poll_submitted);
+ spin_unlock(&ctx->poll_lock);
+ } else {
+ if (front)
+ list_add(&kiocb->ki_list, &state->req_list);
+ else
+ list_add_tail(&kiocb->ki_list, &state->req_list);
+ if (++state->req_count >= AIO_IOPOLL_BATCH)
+ aio_flush_state_reqs(ctx, state);
+ }
}
static ssize_t aio_read(struct aio_kiocb *kiocb, const struct iocb *iocb,
@@ -2218,7 +2266,8 @@ static ssize_t aio_poll(struct aio_kiocb *aiocb, const struct iocb *iocb)
}
static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
- struct iocb __user *user_iocb, bool compat)
+ struct iocb __user *user_iocb,
+ struct aio_submit_state *state, bool compat)
{
struct aio_kiocb *req;
ssize_t ret;
@@ -2322,7 +2371,7 @@ static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
ret = -EAGAIN;
goto out_put_req;
}
- aio_iopoll_iocb_issued(req);
+ aio_iopoll_iocb_issued(state, req);
}
return 0;
out_put_req:
@@ -2336,7 +2385,7 @@ static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
}
static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
- bool compat)
+ struct aio_submit_state *state, bool compat)
{
struct iocb iocb, *iocbp;
@@ -2353,7 +2402,44 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
iocbp = &iocb;
}
- return __io_submit_one(ctx, iocbp, user_iocb, compat);
+ return __io_submit_one(ctx, iocbp, user_iocb, state, compat);
+}
+
+#ifdef CONFIG_BLOCK
+static void aio_state_unplug(struct blk_plug_cb *cb, bool from_schedule)
+{
+ struct aio_submit_state *state;
+
+ state = container_of(cb, struct aio_submit_state, plug_cb);
+ if (!list_empty(&state->req_list))
+ aio_flush_state_reqs(state->ctx, state);
+}
+#endif
+
+/*
+ * Batched submission is done, ensure local IO is flushed out.
+ */
+static void aio_submit_state_end(struct aio_submit_state *state)
+{
+ blk_finish_plug(&state->plug);
+ if (!list_empty(&state->req_list))
+ aio_flush_state_reqs(state->ctx, state);
+}
+
+/*
+ * Start submission side cache.
+ */
+static void aio_submit_state_start(struct aio_submit_state *state,
+ struct kioctx *ctx)
+{
+ state->ctx = ctx;
+ INIT_LIST_HEAD(&state->req_list);
+ state->req_count = 0;
+#ifdef CONFIG_BLOCK
+ state->plug_cb.callback = aio_state_unplug;
+ blk_start_plug(&state->plug);
+ list_add(&state->plug_cb.list, &state->plug.cb_list);
+#endif
}
/* sys_io_submit:
@@ -2371,10 +2457,10 @@ static int io_submit_one(struct kioctx *ctx, struct iocb __user *user_iocb,
SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
struct iocb __user * __user *, iocbpp)
{
+ struct aio_submit_state state, *statep = NULL;
struct kioctx *ctx;
long ret = 0;
int i = 0;
- struct blk_plug plug;
if (unlikely(nr < 0))
return -EINVAL;
@@ -2388,8 +2474,10 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
if (nr > ctx->nr_events)
nr = ctx->nr_events;
- if (nr > AIO_PLUG_THRESHOLD)
- blk_start_plug(&plug);
+ if (nr > AIO_PLUG_THRESHOLD) {
+ aio_submit_state_start(&state, ctx);
+ statep = &state;
+ }
for (i = 0; i < nr; i++) {
struct iocb __user *user_iocb;
@@ -2398,12 +2486,12 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
break;
}
- ret = io_submit_one(ctx, user_iocb, false);
+ ret = io_submit_one(ctx, user_iocb, statep, false);
if (ret)
break;
}
- if (nr > AIO_PLUG_THRESHOLD)
- blk_finish_plug(&plug);
+ if (statep)
+ aio_submit_state_end(statep);
percpu_ref_put(&ctx->users);
return i ? i : ret;
@@ -2413,10 +2501,10 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
COMPAT_SYSCALL_DEFINE3(io_submit, compat_aio_context_t, ctx_id,
int, nr, compat_uptr_t __user *, iocbpp)
{
+ struct aio_submit_state state, *statep = NULL;
struct kioctx *ctx;
long ret = 0;
int i = 0;
- struct blk_plug plug;
if (unlikely(nr < 0))
return -EINVAL;
@@ -2430,8 +2518,10 @@ COMPAT_SYSCALL_DEFINE3(io_submit, compat_aio_context_t, ctx_id,
if (nr > ctx->nr_events)
nr = ctx->nr_events;
- if (nr > AIO_PLUG_THRESHOLD)
- blk_start_plug(&plug);
+ if (nr > AIO_PLUG_THRESHOLD) {
+ aio_submit_state_start(&state, ctx);
+ statep = &state;
+ }
for (i = 0; i < nr; i++) {
compat_uptr_t user_iocb;
@@ -2440,12 +2530,12 @@ COMPAT_SYSCALL_DEFINE3(io_submit, compat_aio_context_t, ctx_id,
break;
}
- ret = io_submit_one(ctx, compat_ptr(user_iocb), true);
+ ret = io_submit_one(ctx, compat_ptr(user_iocb), statep, true);
if (ret)
break;
}
- if (nr > AIO_PLUG_THRESHOLD)
- blk_finish_plug(&plug);
+ if (statep)
+ aio_submit_state_end(statep);
percpu_ref_put(&ctx->users);
return i ? i : ret;
We have to add each submitted polled request to the io_context poll_submitted list, which means we have to grab the poll_lock. We already use the block plug to batch submissions if we're doing a batch of IO submissions, extend that to cover the poll requests internally as well. Signed-off-by: Jens Axboe <axboe@kernel.dk> --- fs/aio.c | 136 +++++++++++++++++++++++++++++++++++++++++++++---------- 1 file changed, 113 insertions(+), 23 deletions(-)