Message ID | 1468337139-18265-1-git-send-email-roman.penyaev@profitbricks.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On 12/07/2016 17:25, Roman Pen wrote: > AIO context in userspace is represented as a simple ring buffer, which > can be consumed directly without entering the kernel, which obviously > can bring some performance gain. QEMU does not use timeout value for > waiting for events completions, so we can consume all events from > userspace. > > My VM configuration is the following: > > VCPU=8, MQ=8, 1 iothread > > FIO results: > > io_getevents in kernel: > > READ: io=56479MB, aggrb=1882.5MB/s, minb=1882.5MB/s, maxb=1882.5MB/s, mint=30003msec, maxt=30003msec > WRITE: io=56371MB, aggrb=1878.9MB/s, minb=1878.9MB/s, maxb=1878.9MB/s, mint=30003msec, maxt=30003msec > > io_getevents in userspace: > > READ: io=57576MB, aggrb=1919.2MB/s, minb=1919.2MB/s, maxb=1919.2MB/s, mint=30003msec, maxt=30003msec > WRITE: io=57492MB, aggrb=1916.3MB/s, minb=1916.3MB/s, maxb=1916.3MB/s, mint=30003msec, maxt=30003msec > > The gain is only ~2%, but the price is almost nothing. > > Signed-off-by: Roman Pen <roman.penyaev@profitbricks.com> > Cc: Stefan Hajnoczi <stefanha@redhat.com> > Cc: Paolo Bonzini <pbonzini@redhat.com> > Cc: qemu-devel@nongnu.org > --- > block/linux-aio.c | 86 ++++++++++++++++++++++++++++++++++++++++++++----------- > 1 file changed, 70 insertions(+), 16 deletions(-) The patch does not pass checkpatch.pl, but apart from this it's a nice trick. Paolo > diff --git a/block/linux-aio.c b/block/linux-aio.c > index fe7cece..4ab09c0 100644 > --- a/block/linux-aio.c > +++ b/block/linux-aio.c > @@ -58,7 +58,7 @@ struct LinuxAioState { > > /* I/O completion processing */ > QEMUBH *completion_bh; > - struct io_event events[MAX_EVENTS]; > + struct io_event *events; > int event_idx; > int event_max; > }; > @@ -101,6 +101,60 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb) > } > } > > +/** > + * aio_ring buffer which is shared between userspace and kernel. > + */ > +struct aio_ring { > + unsigned id; /* kernel internal index number */ > + unsigned nr; /* number of io_events */ > + unsigned head; /* Written to by userland or under ring_lock > + * mutex by aio_read_events_ring(). */ > + unsigned tail; > + > + unsigned magic; > + unsigned compat_features; > + unsigned incompat_features; > + unsigned header_length; /* size of aio_ring */ > + > + struct io_event io_events[0]; > +}; > + > +/** > + * io_getevents_peek() - returns the number of completed events > + * and sets a pointer on events array. > + * > + * This function does not update the internal ring buffer, only > + * reads head and tail. When @events has been processed > + * io_getevents_commit() must be called. > + */ > +static int io_getevents_peek(io_context_t aio_ctx, unsigned int max, > + struct io_event **events) > +{ > + struct aio_ring *ring = (struct aio_ring*)aio_ctx; > + unsigned head = ring->head, tail = ring->tail; > + unsigned nr; > + > + nr = tail >= head ? tail - head : ring->nr - head; > + *events = ring->io_events + head; > + /* To avoid speculative loads of s->events[i] before observing tail. > + Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */ > + smp_rmb(); > + > + return nr; > +} > + > +/** > + * io_getevents_commit() - advances head of a ring buffer and returns > + * 1 if there are some events left in a ring. > + */ > +static int io_getevents_commit(io_context_t aio_ctx, unsigned int nr) > +{ > + struct aio_ring *ring = (struct aio_ring*)aio_ctx; > + > + ring->head = (ring->head + nr) % ring->nr; > + return (ring->head != ring->tail); > +} > + > /* The completion BH fetches completed I/O requests and invokes their > * callbacks. > * > @@ -118,32 +172,32 @@ static void qemu_laio_completion_bh(void *opaque) > > /* Fetch more completion events when empty */ > if (s->event_idx == s->event_max) { > - do { > - struct timespec ts = { 0 }; > - s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, > - s->events, &ts); > - } while (s->event_max == -EINTR); > - > +more: > + s->event_max = io_getevents_peek(s->ctx, MAX_EVENTS, &s->events); > s->event_idx = 0; > - if (s->event_max <= 0) { > - s->event_max = 0; > + if (s->event_max == 0) > return; /* no more events */ > - } > } > > /* Reschedule so nested event loops see currently pending completions */ > qemu_bh_schedule(s->completion_bh); > > /* Process completion events */ > - while (s->event_idx < s->event_max) { > - struct iocb *iocb = s->events[s->event_idx].obj; > - struct qemu_laiocb *laiocb = > + if (s->event_idx < s->event_max) { > + unsigned nr = s->event_max - s->event_idx; > + > + while (s->event_idx < s->event_max) { > + struct iocb *iocb = s->events[s->event_idx].obj; > + struct qemu_laiocb *laiocb = > container_of(iocb, struct qemu_laiocb, iocb); > > - laiocb->ret = io_event_ret(&s->events[s->event_idx]); > - s->event_idx++; > + laiocb->ret = io_event_ret(&s->events[s->event_idx]); > + s->event_idx++; > > - qemu_laio_process_completion(laiocb); > + qemu_laio_process_completion(laiocb); > + } > + if (io_getevents_commit(s->ctx, nr)) > + goto more; > } > > if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { >
On Tue, Jul 12, 2016 at 05:25:39PM +0200, Roman Pen wrote: > @@ -101,6 +101,60 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb) > } > } > > +/** > + * aio_ring buffer which is shared between userspace and kernel. > + */ Please extend the comment to explain that this is copy-pasted from Linux fs/aio.c because there is no uapi header for this struct. Other userspace programs like fio(1) already use the ring buffer directly, so we assume it's stable ABI... > +struct aio_ring { > + unsigned id; /* kernel internal index number */ > + unsigned nr; /* number of io_events */ > + unsigned head; /* Written to by userland or under ring_lock > + * mutex by aio_read_events_ring(). */ This comment refers to the aio_read_events_ring() kernel function, which doesn't exist in the QEMU codebase. Please drop it. > + unsigned tail; > + > + unsigned magic; > + unsigned compat_features; > + unsigned incompat_features; > + unsigned header_length; /* size of aio_ring */ > + > + struct io_event io_events[0]; > +}; > + > +/** > + * io_getevents_peek() - returns the number of completed events > + * and sets a pointer on events array. > + * > + * This function does not update the internal ring buffer, only > + * reads head and tail. When @events has been processed > + * io_getevents_commit() must be called. > + */ Please follow the doc comment style in include/qom/object.h (it's the GTK Doc style). > +static int io_getevents_peek(io_context_t aio_ctx, unsigned int max, > + struct io_event **events) max is unused and can be dropped. > +{ > + struct aio_ring *ring = (struct aio_ring*)aio_ctx; > + unsigned head = ring->head, tail = ring->tail; > + unsigned nr; > + > + nr = tail >= head ? tail - head : ring->nr - head; > + *events = ring->io_events + head; > + /* To avoid speculative loads of s->events[i] before observing tail. > + Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */ > + smp_rmb(); > + > + return nr; head and tail are unsigned. Any reason to make the return value int instead of unsigned? > +} > + > +/** > + * io_getevents_commit() - advances head of a ring buffer and returns > + * 1 if there are some events left in a ring. > + */ > +static int io_getevents_commit(io_context_t aio_ctx, unsigned int nr) QEMU uses the bool type for boolean values, so the return value should be bool. > +{ > + struct aio_ring *ring = (struct aio_ring*)aio_ctx; > + > + ring->head = (ring->head + nr) % ring->nr; > + return (ring->head != ring->tail); > +} > + > /* The completion BH fetches completed I/O requests and invokes their > * callbacks. > * > @@ -118,32 +172,32 @@ static void qemu_laio_completion_bh(void *opaque) > > /* Fetch more completion events when empty */ > if (s->event_idx == s->event_max) { > - do { > - struct timespec ts = { 0 }; > - s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, > - s->events, &ts); > - } while (s->event_max == -EINTR); > - > +more: > + s->event_max = io_getevents_peek(s->ctx, MAX_EVENTS, &s->events); > s->event_idx = 0; > - if (s->event_max <= 0) { > - s->event_max = 0; > + if (s->event_max == 0) > return; /* no more events */ > - } > } > > /* Reschedule so nested event loops see currently pending completions */ > qemu_bh_schedule(s->completion_bh); > > /* Process completion events */ > - while (s->event_idx < s->event_max) { > - struct iocb *iocb = s->events[s->event_idx].obj; > - struct qemu_laiocb *laiocb = > + if (s->event_idx < s->event_max) { > + unsigned nr = s->event_max - s->event_idx; > + > + while (s->event_idx < s->event_max) { > + struct iocb *iocb = s->events[s->event_idx].obj; > + struct qemu_laiocb *laiocb = > container_of(iocb, struct qemu_laiocb, iocb); > > - laiocb->ret = io_event_ret(&s->events[s->event_idx]); > - s->event_idx++; > + laiocb->ret = io_event_ret(&s->events[s->event_idx]); > + s->event_idx++; > > - qemu_laio_process_completion(laiocb); > + qemu_laio_process_completion(laiocb); > + } > + if (io_getevents_commit(s->ctx, nr)) > + goto more; QEMU always uses curly braces, even if the body of the if statement is only one line. scripts/checkpatch.pl should show this.
diff --git a/block/linux-aio.c b/block/linux-aio.c index fe7cece..4ab09c0 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -58,7 +58,7 @@ struct LinuxAioState { /* I/O completion processing */ QEMUBH *completion_bh; - struct io_event events[MAX_EVENTS]; + struct io_event *events; int event_idx; int event_max; }; @@ -101,6 +101,60 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb) } } +/** + * aio_ring buffer which is shared between userspace and kernel. + */ +struct aio_ring { + unsigned id; /* kernel internal index number */ + unsigned nr; /* number of io_events */ + unsigned head; /* Written to by userland or under ring_lock + * mutex by aio_read_events_ring(). */ + unsigned tail; + + unsigned magic; + unsigned compat_features; + unsigned incompat_features; + unsigned header_length; /* size of aio_ring */ + + struct io_event io_events[0]; +}; + +/** + * io_getevents_peek() - returns the number of completed events + * and sets a pointer on events array. + * + * This function does not update the internal ring buffer, only + * reads head and tail. When @events has been processed + * io_getevents_commit() must be called. + */ +static int io_getevents_peek(io_context_t aio_ctx, unsigned int max, + struct io_event **events) +{ + struct aio_ring *ring = (struct aio_ring*)aio_ctx; + unsigned head = ring->head, tail = ring->tail; + unsigned nr; + + nr = tail >= head ? tail - head : ring->nr - head; + *events = ring->io_events + head; + /* To avoid speculative loads of s->events[i] before observing tail. + Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */ + smp_rmb(); + + return nr; +} + +/** + * io_getevents_commit() - advances head of a ring buffer and returns + * 1 if there are some events left in a ring. + */ +static int io_getevents_commit(io_context_t aio_ctx, unsigned int nr) +{ + struct aio_ring *ring = (struct aio_ring*)aio_ctx; + + ring->head = (ring->head + nr) % ring->nr; + return (ring->head != ring->tail); +} + /* The completion BH fetches completed I/O requests and invokes their * callbacks. * @@ -118,32 +172,32 @@ static void qemu_laio_completion_bh(void *opaque) /* Fetch more completion events when empty */ if (s->event_idx == s->event_max) { - do { - struct timespec ts = { 0 }; - s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, - s->events, &ts); - } while (s->event_max == -EINTR); - +more: + s->event_max = io_getevents_peek(s->ctx, MAX_EVENTS, &s->events); s->event_idx = 0; - if (s->event_max <= 0) { - s->event_max = 0; + if (s->event_max == 0) return; /* no more events */ - } } /* Reschedule so nested event loops see currently pending completions */ qemu_bh_schedule(s->completion_bh); /* Process completion events */ - while (s->event_idx < s->event_max) { - struct iocb *iocb = s->events[s->event_idx].obj; - struct qemu_laiocb *laiocb = + if (s->event_idx < s->event_max) { + unsigned nr = s->event_max - s->event_idx; + + while (s->event_idx < s->event_max) { + struct iocb *iocb = s->events[s->event_idx].obj; + struct qemu_laiocb *laiocb = container_of(iocb, struct qemu_laiocb, iocb); - laiocb->ret = io_event_ret(&s->events[s->event_idx]); - s->event_idx++; + laiocb->ret = io_event_ret(&s->events[s->event_idx]); + s->event_idx++; - qemu_laio_process_completion(laiocb); + qemu_laio_process_completion(laiocb); + } + if (io_getevents_commit(s->ctx, nr)) + goto more; } if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) {
AIO context in userspace is represented as a simple ring buffer, which can be consumed directly without entering the kernel, which obviously can bring some performance gain. QEMU does not use timeout value for waiting for events completions, so we can consume all events from userspace. My VM configuration is the following: VCPU=8, MQ=8, 1 iothread FIO results: io_getevents in kernel: READ: io=56479MB, aggrb=1882.5MB/s, minb=1882.5MB/s, maxb=1882.5MB/s, mint=30003msec, maxt=30003msec WRITE: io=56371MB, aggrb=1878.9MB/s, minb=1878.9MB/s, maxb=1878.9MB/s, mint=30003msec, maxt=30003msec io_getevents in userspace: READ: io=57576MB, aggrb=1919.2MB/s, minb=1919.2MB/s, maxb=1919.2MB/s, mint=30003msec, maxt=30003msec WRITE: io=57492MB, aggrb=1916.3MB/s, minb=1916.3MB/s, maxb=1916.3MB/s, mint=30003msec, maxt=30003msec The gain is only ~2%, but the price is almost nothing. Signed-off-by: Roman Pen <roman.penyaev@profitbricks.com> Cc: Stefan Hajnoczi <stefanha@redhat.com> Cc: Paolo Bonzini <pbonzini@redhat.com> Cc: qemu-devel@nongnu.org --- block/linux-aio.c | 86 ++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 70 insertions(+), 16 deletions(-)