From patchwork Tue Sep 13 10:02:30 2016 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Stefan Hajnoczi X-Patchwork-Id: 9328679 Return-Path: Received: from mail.wl.linuxfoundation.org (pdx-wl-mail.web.codeaurora.org [172.30.200.125]) by pdx-korg-patchwork.web.codeaurora.org (Postfix) with ESMTP id 6C81A6048F for ; Tue, 13 Sep 2016 10:03:46 +0000 (UTC) Received: from mail.wl.linuxfoundation.org (localhost [127.0.0.1]) by mail.wl.linuxfoundation.org (Postfix) with ESMTP id 52ED329290 for ; Tue, 13 Sep 2016 10:03:46 +0000 (UTC) Received: by mail.wl.linuxfoundation.org (Postfix, from userid 486) id 47A2729298; Tue, 13 Sep 2016 10:03:46 +0000 (UTC) X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on pdx-wl-mail.web.codeaurora.org X-Spam-Level: X-Spam-Status: No, score=-6.9 required=2.0 tests=BAYES_00,RCVD_IN_DNSWL_HI autolearn=ham version=3.3.1 Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) (using TLSv1 with cipher AES256-SHA (256/256 bits)) (No client certificate requested) by mail.wl.linuxfoundation.org (Postfix) with ESMTPS id 51B5E29290 for ; Tue, 13 Sep 2016 10:03:42 +0000 (UTC) Received: from localhost ([::1]:47627 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1bjkZ3-0005JN-Gr for patchwork-qemu-devel@patchwork.kernel.org; Tue, 13 Sep 2016 06:03:41 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:33153) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1bjkYP-0005FT-6O for qemu-devel@nongnu.org; Tue, 13 Sep 2016 06:03:05 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1bjkYJ-0001j7-QG for qemu-devel@nongnu.org; Tue, 13 Sep 2016 06:03:00 -0400 Received: from mx1.redhat.com ([209.132.183.28]:51416) by eggs.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1bjkYJ-0001ix-Hz for qemu-devel@nongnu.org; Tue, 13 Sep 2016 06:02:55 -0400 Received: from int-mx11.intmail.prod.int.phx2.redhat.com (int-mx11.intmail.prod.int.phx2.redhat.com [10.5.11.24]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by mx1.redhat.com (Postfix) with ESMTPS id D65C443A55; Tue, 13 Sep 2016 10:02:54 +0000 (UTC) Received: from localhost (ovpn-112-64.ams2.redhat.com [10.36.112.64]) by int-mx11.intmail.prod.int.phx2.redhat.com (8.14.4/8.14.4) with ESMTP id u8DA2ruc024896; Tue, 13 Sep 2016 06:02:53 -0400 From: Stefan Hajnoczi To: Date: Tue, 13 Sep 2016 11:02:30 +0100 Message-Id: <1473760967-31840-3-git-send-email-stefanha@redhat.com> In-Reply-To: <1473760967-31840-1-git-send-email-stefanha@redhat.com> References: <1473760967-31840-1-git-send-email-stefanha@redhat.com> X-Scanned-By: MIMEDefang 2.68 on 10.5.11.24 X-Greylist: Sender IP whitelisted, not delayed by milter-greylist-4.5.16 (mx1.redhat.com [10.5.110.30]); Tue, 13 Sep 2016 10:02:54 +0000 (UTC) X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.2.x-3.x [generic] X-Received-From: 209.132.183.28 Subject: [Qemu-devel] [PULL v2 02/19] linux-aio: consume events in userspace instead of calling io_getevents X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: Peter Maydell , Stefan Hajnoczi , Roman Pen , Paolo Bonzini Errors-To: qemu-devel-bounces+patchwork-qemu-devel=patchwork.kernel.org@nongnu.org Sender: "Qemu-devel" X-Virus-Scanned: ClamAV using ClamSMTP From: Roman Pen AIO context in userspace is represented as a simple ring buffer, which can be consumed directly without entering the kernel, which obviously can bring some performance gain. QEMU does not use timeout value for waiting for events completions, so we can consume all events from userspace. Signed-off-by: Roman Pen Message-id: 1468931263-32667-2-git-send-email-roman.penyaev@profitbricks.com Cc: Stefan Hajnoczi Cc: Paolo Bonzini Cc: qemu-devel@nongnu.org Signed-off-by: Stefan Hajnoczi --- block/linux-aio.c | 129 ++++++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 101 insertions(+), 28 deletions(-) diff --git a/block/linux-aio.c b/block/linux-aio.c index e906abe..62ee1ea 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -59,7 +59,6 @@ struct LinuxAioState { /* I/O completion processing */ QEMUBH *completion_bh; - struct io_event events[MAX_EVENTS]; int event_idx; int event_max; }; @@ -102,6 +101,85 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb) } } +/** + * aio_ring buffer which is shared between userspace and kernel. + * + * This copied from linux/fs/aio.c, common header does not exist + * but AIO exists for ages so we assume ABI is stable. + */ +struct aio_ring { + unsigned id; /* kernel internal index number */ + unsigned nr; /* number of io_events */ + unsigned head; /* Written to by userland or by kernel. */ + unsigned tail; + + unsigned magic; + unsigned compat_features; + unsigned incompat_features; + unsigned header_length; /* size of aio_ring */ + + struct io_event io_events[0]; +}; + +/** + * io_getevents_peek: + * @ctx: AIO context + * @events: pointer on events array, output value + + * Returns the number of completed events and sets a pointer + * on events array. This function does not update the internal + * ring buffer, only reads head and tail. When @events has been + * processed io_getevents_commit() must be called. + */ +static inline unsigned int io_getevents_peek(io_context_t ctx, + struct io_event **events) +{ + struct aio_ring *ring = (struct aio_ring *)ctx; + unsigned int head = ring->head, tail = ring->tail; + unsigned int nr; + + nr = tail >= head ? tail - head : ring->nr - head; + *events = ring->io_events + head; + /* To avoid speculative loads of s->events[i] before observing tail. + Paired with smp_wmb() inside linux/fs/aio.c: aio_complete(). */ + smp_rmb(); + + return nr; +} + +/** + * io_getevents_commit: + * @ctx: AIO context + * @nr: the number of events on which head should be advanced + * + * Advances head of a ring buffer. + */ +static inline void io_getevents_commit(io_context_t ctx, unsigned int nr) +{ + struct aio_ring *ring = (struct aio_ring *)ctx; + + if (nr) { + ring->head = (ring->head + nr) % ring->nr; + } +} + +/** + * io_getevents_advance_and_peek: + * @ctx: AIO context + * @events: pointer on events array, output value + * @nr: the number of events on which head should be advanced + * + * Advances head of a ring buffer and returns number of elements left. + */ +static inline unsigned int +io_getevents_advance_and_peek(io_context_t ctx, + struct io_event **events, + unsigned int nr) +{ + io_getevents_commit(ctx, nr); + return io_getevents_peek(ctx, events); +} + /* The completion BH fetches completed I/O requests and invokes their * callbacks. * @@ -116,43 +194,38 @@ static void qemu_laio_process_completion(struct qemu_laiocb *laiocb) static void qemu_laio_completion_bh(void *opaque) { LinuxAioState *s = opaque; - - /* Fetch more completion events when empty */ - if (s->event_idx == s->event_max) { - do { - struct timespec ts = { 0 }; - s->event_max = io_getevents(s->ctx, MAX_EVENTS, MAX_EVENTS, - s->events, &ts); - } while (s->event_max == -EINTR); - - s->event_idx = 0; - if (s->event_max <= 0) { - s->event_max = 0; - return; /* no more events */ - } - s->io_q.in_flight -= s->event_max; - } + struct io_event *events; /* Reschedule so nested event loops see currently pending completions */ qemu_bh_schedule(s->completion_bh); - /* Process completion events */ - while (s->event_idx < s->event_max) { - struct iocb *iocb = s->events[s->event_idx].obj; - struct qemu_laiocb *laiocb = + while ((s->event_max = io_getevents_advance_and_peek(s->ctx, &events, + s->event_idx))) { + for (s->event_idx = 0; s->event_idx < s->event_max; ) { + struct iocb *iocb = events[s->event_idx].obj; + struct qemu_laiocb *laiocb = container_of(iocb, struct qemu_laiocb, iocb); - laiocb->ret = io_event_ret(&s->events[s->event_idx]); - s->event_idx++; + laiocb->ret = io_event_ret(&events[s->event_idx]); - qemu_laio_process_completion(laiocb); - } - - if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { - ioq_submit(s); + /* Change counters one-by-one because we can be nested. */ + s->io_q.in_flight--; + s->event_idx++; + qemu_laio_process_completion(laiocb); + } } qemu_bh_cancel(s->completion_bh); + + /* If we are nested we have to notify the level above that we are done + * by setting event_max to zero, upper level will then jump out of it's + * own `for` loop. If we are the last all counters droped to zero. */ + s->event_max = 0; + s->event_idx = 0; + + if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { + ioq_submit(s); + } } static void qemu_laio_completion_cb(EventNotifier *e)