@@ -344,6 +344,7 @@
333 common io_pgetevents __x64_sys_io_pgetevents
334 common rseq __x64_sys_rseq
335 common io_setup2 __x64_sys_io_setup2
+336 common io_ring_enter __x64_sys_io_ring_enter
#
# x32-specific system call numbers start at 512 to avoid cache impact
@@ -92,6 +92,18 @@ struct ctx_rq_wait {
atomic_t count;
};
+struct aio_mapped_range {
+ struct page **pages;
+ long nr_pages;
+};
+
+struct aio_iocb_ring {
+ struct aio_mapped_range ring_range; /* maps user SQ ring */
+ struct aio_sq_ring *ring;
+
+ struct aio_mapped_range iocb_range; /* maps user iocbs */
+};
+
struct kioctx {
struct percpu_ref users;
atomic_t dead;
@@ -127,6 +139,11 @@ struct kioctx {
struct page **ring_pages;
long nr_pages;
+ /* if used, completion and submission rings */
+ struct aio_iocb_ring sq_ring;
+ struct aio_mapped_range cq_ring;
+ int cq_ring_overflow;
+
struct rcu_work free_rwork; /* see free_ioctx() */
/*
@@ -280,6 +297,13 @@ static struct vfsmount *aio_mnt;
static const struct file_operations aio_ring_fops;
static const struct address_space_operations aio_ctx_aops;
+static const unsigned int array_page_shift =
+ ilog2(PAGE_SIZE / sizeof(u32));
+static const unsigned int iocb_page_shift =
+ ilog2(PAGE_SIZE / sizeof(struct iocb));
+static const unsigned int event_page_shift =
+ ilog2(PAGE_SIZE / sizeof(struct io_event));
+
/*
* We rely on block level unplugs to flush pending requests, if we schedule
*/
@@ -289,6 +313,7 @@ static const bool aio_use_state_req_list = true;
static const bool aio_use_state_req_list = false;
#endif
+static void aio_scqring_unmap(struct kioctx *);
static void aio_iopoll_reap_events(struct kioctx *);
static struct file *aio_private_file(struct kioctx *ctx, loff_t nr_pages)
@@ -519,6 +544,12 @@ static const struct address_space_operations aio_ctx_aops = {
#endif
};
+/* Polled IO or SQ/CQ rings don't use the old ring */
+static bool aio_ctx_old_ring(struct kioctx *ctx)
+{
+ return !(ctx->flags & (IOCTX_FLAG_IOPOLL | IOCTX_FLAG_SCQRING));
+}
+
static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events)
{
struct aio_ring *ring;
@@ -533,7 +564,7 @@ static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events)
* IO polling doesn't require any io event entries
*/
size = sizeof(struct aio_ring);
- if (!(ctx->flags & IOCTX_FLAG_IOPOLL)) {
+ if (aio_ctx_old_ring(ctx)) {
nr_events += 2; /* 1 is required, 2 for good luck */
size += sizeof(struct io_event) * nr_events;
}
@@ -625,7 +656,7 @@ static int aio_setup_ring(struct kioctx *ctx, unsigned int nr_events)
*/
static bool aio_ctx_supports_cancel(struct kioctx *ctx)
{
- return (ctx->flags & IOCTX_FLAG_IOPOLL) == 0;
+ return (ctx->flags & (IOCTX_FLAG_IOPOLL | IOCTX_FLAG_SCQRING)) == 0;
}
#define AIO_EVENTS_PER_PAGE (PAGE_SIZE / sizeof(struct io_event))
@@ -661,6 +692,7 @@ static void free_ioctx(struct work_struct *work)
free_rwork);
pr_debug("freeing %p\n", ctx);
+ aio_scqring_unmap(ctx);
aio_free_ring(ctx);
free_percpu(ctx->cpu);
percpu_ref_exit(&ctx->reqs);
@@ -1205,6 +1237,39 @@ static void aio_fill_event(struct io_event *ev, struct aio_kiocb *iocb,
ev->res2 = res2;
}
+static void aio_commit_cqring(struct kioctx *ctx, unsigned next_tail)
+{
+ struct aio_cq_ring *ring = page_address(ctx->cq_ring.pages[0]);
+
+ if (next_tail != ring->tail) {
+ ring->tail = next_tail;
+ smp_wmb();
+ }
+}
+
+static struct io_event *aio_peek_cqring(struct kioctx *ctx, unsigned *ntail)
+{
+ struct aio_cq_ring *ring;
+ struct io_event *ev;
+ unsigned tail;
+
+ ring = page_address(ctx->cq_ring.pages[0]);
+
+ smp_rmb();
+ tail = READ_ONCE(ring->tail);
+ *ntail = tail + 1;
+ if (*ntail == ring->nr_events)
+ *ntail = 0;
+ if (*ntail == READ_ONCE(ring->head))
+ return NULL;
+
+ /* io_event array starts offset one into the mapped range */
+ tail++;
+ ev = page_address(ctx->cq_ring.pages[tail >> event_page_shift]);
+ tail &= ((1 << event_page_shift) - 1);
+ return ev + tail;
+}
+
static void aio_ring_complete(struct kioctx *ctx, struct aio_kiocb *iocb,
long res, long res2)
{
@@ -1266,7 +1331,36 @@ static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
{
struct kioctx *ctx = iocb->ki_ctx;
- aio_ring_complete(ctx, iocb, res, res2);
+ if (ctx->flags & IOCTX_FLAG_SCQRING) {
+ unsigned long flags;
+ struct io_event *ev;
+ unsigned int tail;
+
+ /*
+ * If we can't get a cq entry, userspace overflowed the
+ * submission (by quite a lot). Flag it as an overflow
+ * condition, and next io_ring_enter(2) call will return
+ * -EOVERFLOW.
+ */
+ spin_lock_irqsave(&ctx->completion_lock, flags);
+ ev = aio_peek_cqring(ctx, &tail);
+ if (ev) {
+ aio_fill_event(ev, iocb, res, res2);
+ aio_commit_cqring(ctx, tail);
+ } else
+ ctx->cq_ring_overflow = 1;
+ spin_unlock_irqrestore(&ctx->completion_lock, flags);
+ } else {
+ aio_ring_complete(ctx, iocb, res, res2);
+
+ /*
+ * We have to order our ring_info tail store above and test
+ * of the wait list below outside the wait lock. This is
+ * like in wake_up_bit() where clearing a bit has to be
+ * ordered with the unlocked test.
+ */
+ smp_mb();
+ }
/*
* Check if the user asked us to deliver the result through an
@@ -1278,14 +1372,6 @@ static void aio_complete(struct aio_kiocb *iocb, long res, long res2)
eventfd_ctx_put(iocb->ki_eventfd);
}
- /*
- * We have to order our ring_info tail store above and test
- * of the wait list below outside the wait lock. This is
- * like in wake_up_bit() where clearing a bit has to be
- * ordered with the unlocked test.
- */
- smp_mb();
-
if (waitqueue_active(&ctx->wait))
wake_up(&ctx->wait);
iocb_put(iocb);
@@ -1408,6 +1494,9 @@ static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
return 0;
list_for_each_entry_safe(iocb, n, &ctx->poll_completing, ki_list) {
+ struct io_event *ev = NULL;
+ unsigned int next_tail;
+
if (*nr_events == max)
break;
if (!test_bit(KIOCB_F_POLL_COMPLETED, &iocb->ki_flags))
@@ -1415,6 +1504,14 @@ static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
if (to_free == AIO_IOPOLL_BATCH)
iocb_put_many(ctx, iocbs, &to_free);
+ /* Will only happen if the application over-commits */
+ ret = -EAGAIN;
+ if (ctx->flags & IOCTX_FLAG_SCQRING) {
+ ev = aio_peek_cqring(ctx, &next_tail);
+ if (!ev)
+ break;
+ }
+
list_del(&iocb->ki_list);
iocbs[to_free++] = iocb;
@@ -1433,8 +1530,11 @@ static long aio_iopoll_reap(struct kioctx *ctx, struct io_event __user *evs,
file_count = 1;
}
- if (evs && copy_to_user(evs + *nr_events, &iocb->ki_ev,
- sizeof(iocb->ki_ev))) {
+ if (ev) {
+ memcpy(ev, &iocb->ki_ev, sizeof(*ev));
+ aio_commit_cqring(ctx, next_tail);
+ } else if (evs && copy_to_user(evs + *nr_events, &iocb->ki_ev,
+ sizeof(iocb->ki_ev))) {
ret = -EFAULT;
break;
}
@@ -1615,24 +1715,139 @@ static long read_events(struct kioctx *ctx, long min_nr, long nr,
return ret;
}
+static void aio_unmap_range(struct aio_mapped_range *range)
+{
+ int i;
+
+ if (!range->nr_pages)
+ return;
+
+ for (i = 0; i < range->nr_pages; i++)
+ put_page(range->pages[i]);
+
+ kfree(range->pages);
+ range->pages = NULL;
+ range->nr_pages = 0;
+}
+
+static int aio_map_range(struct aio_mapped_range *range, void __user *uaddr,
+ size_t size, int gup_flags)
+{
+ int nr_pages, ret;
+
+ if ((unsigned long) uaddr & ~PAGE_MASK)
+ return -EINVAL;
+
+ nr_pages = (size + PAGE_SIZE - 1) >> PAGE_SHIFT;
+
+ range->pages = kzalloc(nr_pages * sizeof(struct page *), GFP_KERNEL);
+ if (!range->pages)
+ return -ENOMEM;
+
+ down_write(¤t->mm->mmap_sem);
+ ret = get_user_pages((unsigned long) uaddr, nr_pages, gup_flags,
+ range->pages, NULL);
+ up_write(¤t->mm->mmap_sem);
+
+ if (ret < nr_pages) {
+ kfree(range->pages);
+ return -ENOMEM;
+ }
+
+ range->nr_pages = nr_pages;
+ return 0;
+}
+
+static void aio_scqring_unmap(struct kioctx *ctx)
+{
+ aio_unmap_range(&ctx->sq_ring.ring_range);
+ aio_unmap_range(&ctx->sq_ring.iocb_range);
+ aio_unmap_range(&ctx->cq_ring);
+}
+
+static int aio_scqring_map(struct kioctx *ctx,
+ struct aio_sq_ring __user *sq_ring,
+ struct aio_cq_ring __user *cq_ring)
+{
+ int ret, sq_ring_size, cq_ring_size;
+ struct aio_cq_ring *kcq_ring;
+ void __user *uptr;
+ size_t size;
+
+ /* Two is the minimum size we can support. */
+ if (ctx->max_reqs < 2)
+ return -EINVAL;
+
+ /*
+ * The CQ ring size is QD + 1, so we don't have to track full condition
+ * for head == tail. The SQ ring we make twice that in size, to make
+ * room for having more inflight than the QD.
+ */
+ sq_ring_size = ctx->max_reqs;
+ cq_ring_size = 2 * ctx->max_reqs;
+
+ /* Map SQ ring and iocbs */
+ size = sizeof(struct aio_sq_ring) + sq_ring_size * sizeof(u32);
+ ret = aio_map_range(&ctx->sq_ring.ring_range, sq_ring, size, FOLL_WRITE);
+ if (ret)
+ return ret;
+
+ ctx->sq_ring.ring = page_address(ctx->sq_ring.ring_range.pages[0]);
+ if (ctx->sq_ring.ring->nr_events < sq_ring_size) {
+ ret = -EFAULT;
+ goto err;
+ }
+ ctx->sq_ring.ring->nr_events = sq_ring_size;
+ ctx->sq_ring.ring->head = ctx->sq_ring.ring->tail = 0;
+
+ size = sizeof(struct iocb) * sq_ring_size;
+ uptr = (void __user *) (unsigned long) ctx->sq_ring.ring->iocbs;
+ ret = aio_map_range(&ctx->sq_ring.iocb_range, uptr, size, 0);
+ if (ret)
+ goto err;
+
+ /* Map CQ ring and io_events */
+ size = sizeof(struct aio_cq_ring) +
+ cq_ring_size * sizeof(struct io_event);
+ ret = aio_map_range(&ctx->cq_ring, cq_ring, size, FOLL_WRITE);
+ if (ret)
+ goto err;
+
+ kcq_ring = page_address(ctx->cq_ring.pages[0]);
+ if (kcq_ring->nr_events < cq_ring_size) {
+ ret = -EFAULT;
+ goto err;
+ }
+ kcq_ring->nr_events = cq_ring_size;
+ kcq_ring->head = kcq_ring->tail = 0;
+
+err:
+ if (ret) {
+ aio_unmap_range(&ctx->sq_ring.ring_range);
+ aio_unmap_range(&ctx->sq_ring.iocb_range);
+ aio_unmap_range(&ctx->cq_ring);
+ }
+ return ret;
+}
+
/* sys_io_setup2:
* Like sys_io_setup(), except that it takes a set of flags
* (IOCTX_FLAG_*), and some pointers to user structures:
*
- * *user1 - reserved for future use
+ * *sq_ring - pointer to the userspace SQ ring, if used.
*
- * *user2 - reserved for future use.
+ * *cq_ring - pointer to the userspace CQ ring, if used.
*/
-SYSCALL_DEFINE5(io_setup2, u32, nr_events, u32, flags, void __user *, user1,
- void __user *, user2, aio_context_t __user *, ctxp)
+SYSCALL_DEFINE5(io_setup2, u32, nr_events, u32, flags,
+ struct aio_sq_ring __user *, sq_ring,
+ struct aio_cq_ring __user *, cq_ring,
+ aio_context_t __user *, ctxp)
{
struct kioctx *ioctx;
unsigned long ctx;
long ret;
- if (user1 || user2)
- return -EINVAL;
- if (flags & ~IOCTX_FLAG_IOPOLL)
+ if (flags & ~(IOCTX_FLAG_IOPOLL | IOCTX_FLAG_SCQRING))
return -EINVAL;
ret = get_user(ctx, ctxp);
@@ -1644,9 +1859,17 @@ SYSCALL_DEFINE5(io_setup2, u32, nr_events, u32, flags, void __user *, user1,
if (IS_ERR(ioctx))
goto out;
+ if (flags & IOCTX_FLAG_SCQRING) {
+ ret = aio_scqring_map(ioctx, sq_ring, cq_ring);
+ if (ret)
+ goto err;
+ }
+
ret = put_user(ioctx->user_id, ctxp);
- if (ret)
+ if (ret) {
+err:
kill_ioctx(current->mm, ioctx, NULL);
+ }
percpu_ref_put(&ioctx->users);
out:
return ret;
@@ -2323,8 +2546,7 @@ static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
return -EINVAL;
}
- /* Poll IO doesn't need ring reservations */
- if (!(ctx->flags & IOCTX_FLAG_IOPOLL) && !get_reqs_available(ctx))
+ if (aio_ctx_old_ring(ctx) && !get_reqs_available(ctx))
return -EAGAIN;
ret = -EAGAIN;
@@ -2413,7 +2635,7 @@ static int __io_submit_one(struct kioctx *ctx, const struct iocb *iocb,
eventfd_ctx_put(req->ki_eventfd);
iocb_put(req);
out_put_reqs_available:
- if (!(ctx->flags & IOCTX_FLAG_IOPOLL))
+ if (aio_ctx_old_ring(ctx))
put_reqs_available(ctx, 1);
return ret;
}
@@ -2473,6 +2695,211 @@ static void aio_submit_state_start(struct aio_submit_state *state,
#endif
}
+static const struct iocb *aio_iocb_from_index(struct kioctx *ctx, unsigned idx)
+{
+ struct aio_mapped_range *range = &ctx->sq_ring.iocb_range;
+ const struct iocb *iocb;
+
+ iocb = page_address(range->pages[idx >> iocb_page_shift]);
+ idx &= ((1 << iocb_page_shift) - 1);
+ return iocb + idx;
+}
+
+static void aio_commit_sqring(struct kioctx *ctx, unsigned next_head)
+{
+ struct aio_sq_ring *ring = ctx->sq_ring.ring;
+
+ if (ring->head != next_head) {
+ ring->head = next_head;
+ smp_wmb();
+ }
+}
+
+static const struct iocb *aio_peek_sqring(struct kioctx *ctx, unsigned *nhead)
+{
+ struct aio_mapped_range *range = &ctx->sq_ring.ring_range;
+ struct aio_sq_ring *ring = ctx->sq_ring.ring;
+ unsigned head, index;
+ u32 *array;
+
+ smp_rmb();
+ head = READ_ONCE(ring->head);
+ if (head == READ_ONCE(ring->tail))
+ return NULL;
+
+ *nhead = head + 1;
+ if (*nhead == ring->nr_events)
+ *nhead = 0;
+
+ /*
+ * No guarantee the array is in the first page, so we can't just
+ * index ring->array. Find the map and offset from the head.
+ */
+ head += offsetof(struct aio_sq_ring, array) >> 2;
+ array = page_address(range->pages[head >> array_page_shift]);
+ head &= ((1 << array_page_shift) - 1);
+ index = array[head];
+
+ if (index < ring->nr_events)
+ return aio_iocb_from_index(ctx, index);
+
+ /* drop invalid entries */
+ aio_commit_sqring(ctx, *nhead);
+ return NULL;
+}
+
+static int aio_ring_submit(struct kioctx *ctx, unsigned int to_submit)
+{
+ struct aio_submit_state state, *statep = NULL;
+ int i, ret = 0, submit = 0;
+
+ if (to_submit > AIO_PLUG_THRESHOLD) {
+ aio_submit_state_start(&state, ctx, to_submit);
+ statep = &state;
+ }
+
+ for (i = 0; i < to_submit; i++) {
+ const struct iocb *iocb;
+ unsigned int next_head;
+
+ iocb = aio_peek_sqring(ctx, &next_head);
+ if (!iocb)
+ break;
+
+ ret = __io_submit_one(ctx, iocb, NULL, statep, false);
+ if (ret)
+ break;
+
+ submit++;
+ aio_commit_sqring(ctx, next_head);
+ }
+
+ if (statep)
+ aio_submit_state_end(statep);
+
+ return submit ? submit : ret;
+}
+
+/*
+ * Wait until events become available, if we don't already have some. The
+ * application must reap them itself, as they reside on the shared cq ring.
+ */
+static int aio_cqring_wait(struct kioctx *ctx, int min_events)
+{
+ struct aio_cq_ring *ring = page_address(ctx->cq_ring.pages[0]);
+ DEFINE_WAIT(wait);
+ int ret = 0;
+
+ smp_rmb();
+ if (ring->head != ring->tail)
+ return 0;
+ if (!min_events)
+ return 0;
+
+ do {
+ prepare_to_wait(&ctx->wait, &wait, TASK_INTERRUPTIBLE);
+
+ ret = 0;
+ smp_rmb();
+ if (ring->head != ring->tail)
+ break;
+
+ schedule();
+
+ ret = -EINVAL;
+ if (atomic_read(&ctx->dead))
+ break;
+ ret = -EINTR;
+ if (signal_pending(current))
+ break;
+ } while (1);
+
+ finish_wait(&ctx->wait, &wait);
+ return ret;
+}
+
+static int __io_ring_enter(struct kioctx *ctx, unsigned int to_submit,
+ unsigned int min_complete, unsigned int flags)
+{
+ int ret = 0;
+
+ if (flags & IORING_FLAG_SUBMIT) {
+ ret = aio_ring_submit(ctx, to_submit);
+ if (ret < 0)
+ return ret;
+ }
+ if (flags & IORING_FLAG_GETEVENTS) {
+ unsigned int nr_events = 0;
+ int get_ret;
+
+ if (!ret && to_submit)
+ min_complete = 0;
+
+ if (ctx->flags & IOCTX_FLAG_IOPOLL)
+ get_ret = __aio_iopoll_check(ctx, NULL, &nr_events,
+ min_complete, -1U);
+ else
+ get_ret = aio_cqring_wait(ctx, min_complete);
+
+ if (get_ret < 0 && !ret)
+ ret = get_ret;
+ }
+
+ return ret;
+}
+
+/* sys_io_ring_enter:
+ * Alternative way to both submit and complete IO, instead of using
+ * io_submit(2) and io_getevents(2). Requires the use of the SQ/CQ
+ * ring interface, hence the io_context must be setup with
+ * io_setup2() and IOCTX_FLAG_SCQRING must be specified (and the
+ * sq_ring/cq_ring passed in).
+ *
+ * Returns the number of IOs submitted, if IORING_FLAG_SUBMIT
+ * is used, otherwise returns 0 for IORING_FLAG_GETEVENTS success,
+ * but not the number of events, as those will have to be found
+ * by the application by reading the CQ ring anyway.
+ *
+ * Apart from that, the error returns are much like io_submit()
+ * and io_getevents(), since a lot of the same error conditions
+ * are shared.
+ */
+SYSCALL_DEFINE4(io_ring_enter, aio_context_t, ctx_id, u32, to_submit,
+ u32, min_complete, u32, flags)
+{
+ struct kioctx *ctx;
+ long ret;
+
+ ctx = lookup_ioctx(ctx_id);
+ if (!ctx) {
+ pr_debug("EINVAL: invalid context id\n");
+ return -EINVAL;
+ }
+
+ ret = -EBUSY;
+ if (!mutex_trylock(&ctx->getevents_lock))
+ goto err;
+
+ ret = -EOVERFLOW;
+ if (ctx->cq_ring_overflow) {
+ ctx->cq_ring_overflow = 0;
+ goto err_unlock;
+ }
+
+ ret = -EINVAL;
+ if (unlikely(atomic_read(&ctx->dead)))
+ goto err_unlock;
+
+ if (ctx->flags & IOCTX_FLAG_SCQRING)
+ ret = __io_ring_enter(ctx, to_submit, min_complete, flags);
+
+err_unlock:
+ mutex_unlock(&ctx->getevents_lock);
+err:
+ percpu_ref_put(&ctx->users);
+ return ret;
+}
+
/* sys_io_submit:
* Queue the nr iocbs pointed to by iocbpp for processing. Returns
* the number of iocbs queued. May return -EINVAL if the aio_context
@@ -2502,6 +2929,10 @@ SYSCALL_DEFINE3(io_submit, aio_context_t, ctx_id, long, nr,
return -EINVAL;
}
+ /* SCQRING must use io_ring_enter() */
+ if (ctx->flags & IOCTX_FLAG_SCQRING)
+ return -EINVAL;
+
if (nr > ctx->nr_events)
nr = ctx->nr_events;
@@ -2653,7 +3084,10 @@ static long do_io_getevents(aio_context_t ctx_id,
long ret = -EINVAL;
if (likely(ioctx)) {
- if (likely(min_nr <= nr && min_nr >= 0)) {
+ /* SCQRING must use io_ring_enter() */
+ if (ioctx->flags & IOCTX_FLAG_SCQRING)
+ ret = -EINVAL;
+ else if (min_nr <= nr && min_nr >= 0) {
if (ioctx->flags & IOCTX_FLAG_IOPOLL)
ret = aio_iopoll_check(ioctx, min_nr, nr, events);
else
@@ -287,8 +287,10 @@ static inline void addr_limit_user_check(void)
*/
#ifndef CONFIG_ARCH_HAS_SYSCALL_WRAPPER
asmlinkage long sys_io_setup(unsigned nr_reqs, aio_context_t __user *ctx);
-asmlinkage long sys_io_setup2(unsigned, unsigned, void __user *, void __user *,
+asmlinkage long sys_io_setup2(unsigned, unsigned, struct aio_cq_ring __user *,
+ struct aio_sq_ring __user *,
aio_context_t __user *);
+asmlinkage long sys_io_ring_enter(aio_context_t, unsigned, unsigned, unsigned);
asmlinkage long sys_io_destroy(aio_context_t ctx);
asmlinkage long sys_io_submit(aio_context_t, long,
struct iocb __user * __user *);
@@ -109,6 +109,35 @@ struct iocb {
}; /* 64 bytes */
#define IOCTX_FLAG_IOPOLL (1 << 0) /* io_context is polled */
+#define IOCTX_FLAG_SCQRING (1 << 1) /* Use SQ/CQ rings */
+
+struct aio_sq_ring {
+ union {
+ struct {
+ u32 head; /* kernel consumer head */
+ u32 tail; /* app producer tail */
+ u32 nr_events; /* max events in ring */
+ u64 iocbs; /* setup pointer to app iocbs */
+ };
+ u32 pad[16];
+ };
+ u32 array[0]; /* actual ring, index to iocbs */
+};
+
+struct aio_cq_ring {
+ union {
+ struct {
+ u32 head; /* app consumer head */
+ u32 tail; /* kernel producer tail */
+ u32 nr_events; /* max events in ring */
+ };
+ struct io_event pad;
+ };
+ struct io_event events[0]; /* ring, array of io_events */
+};
+
+#define IORING_FLAG_SUBMIT (1 << 0)
+#define IORING_FLAG_GETEVENTS (1 << 1)
#undef IFBIG
#undef IFLITTLE
@@ -38,6 +38,7 @@ asmlinkage long sys_ni_syscall(void)
COND_SYSCALL(io_setup);
COND_SYSCALL(io_setup2);
+COND_SYSCALL(io_ring_enter);
COND_SYSCALL_COMPAT(io_setup);
COND_SYSCALL(io_destroy);
COND_SYSCALL(io_submit);
Experimental support for submitting and completing IO through rings shared between the application and kernel. The submission rings are struct iocb, like we would submit through io_submit(), and the completion rings are struct io_event, like we would pass in (and copy back) from io_getevents(). A new system call is added for this, io_ring_enter(). This system call submits IO that is queued in the SQ ring, and/or completes IO and stores the results in the CQ ring. This could be augmented with a kernel thread that does the submission and polling, then the application would never have to enter the kernel to do IO. Sample application: http://git.kernel.dk/cgit/fio/plain/t/aio-ring.c Signed-off-by: Jens Axboe <axboe@kernel.dk> --- arch/x86/entry/syscalls/syscall_64.tbl | 1 + fs/aio.c | 484 +++++++++++++++++++++++-- include/linux/syscalls.h | 4 +- include/uapi/linux/aio_abi.h | 29 ++ kernel/sys_ni.c | 1 + 5 files changed, 493 insertions(+), 26 deletions(-)