diff mbox series

[2/2] xen/9pfs: yield when there isn't enough room on the ring

Message ID 20200520014712.24213-2-sstabellini@kernel.org (mailing list archive)
State New, archived
Headers show
Series revert 9pfs reply truncation, wait for free room to reply | expand

Commit Message

Stefano Stabellini May 20, 2020, 1:47 a.m. UTC
From: Stefano Stabellini <stefano.stabellini@xilinx.com>

Instead of truncating replies, which is problematic, wait until the
client reads more data and frees bytes on the reply ring.

Do that by calling qemu_coroutine_yield(). The corresponding
qemu_coroutine_enter_if_inactive() is called from xen_9pfs_bh upon
receiving the next notification from the client.

We need to be careful to avoid races in case xen_9pfs_bh and the
coroutine are both active at the same time. In xen_9pfs_bh, wait until
either the critical section is over (ring->co == NULL) or until the
coroutine becomes inactive (qemu_coroutine_yield() was called) before
continuing. Then, simply wake up the coroutine if it is inactive.

Signed-off-by: Stefano Stabellini <stefano.stabellini@xilinx.com>
---
 hw/9pfs/xen-9p-backend.c | 28 ++++++++++++++++++++++------
 1 file changed, 22 insertions(+), 6 deletions(-)

Comments

Christian Schoenebeck May 20, 2020, 11:23 a.m. UTC | #1
On Mittwoch, 20. Mai 2020 03:47:12 CEST Stefano Stabellini wrote:
> From: Stefano Stabellini <stefano.stabellini@xilinx.com>
> 
> Instead of truncating replies, which is problematic, wait until the
> client reads more data and frees bytes on the reply ring.
> 
> Do that by calling qemu_coroutine_yield(). The corresponding
> qemu_coroutine_enter_if_inactive() is called from xen_9pfs_bh upon
> receiving the next notification from the client.
> 
> We need to be careful to avoid races in case xen_9pfs_bh and the
> coroutine are both active at the same time. In xen_9pfs_bh, wait until
> either the critical section is over (ring->co == NULL) or until the
> coroutine becomes inactive (qemu_coroutine_yield() was called) before
> continuing. Then, simply wake up the coroutine if it is inactive.
> 
> Signed-off-by: Stefano Stabellini <stefano.stabellini@xilinx.com>
> ---

In general this patch makes sense to me, and much better and cleaner solution 
than what we discussed before. Just one detail ...

>  hw/9pfs/xen-9p-backend.c | 28 ++++++++++++++++++++++------
>  1 file changed, 22 insertions(+), 6 deletions(-)
> 
> diff --git a/hw/9pfs/xen-9p-backend.c b/hw/9pfs/xen-9p-backend.c
> index fc197f6c8a..3939539028 100644
> --- a/hw/9pfs/xen-9p-backend.c
> +++ b/hw/9pfs/xen-9p-backend.c
> @@ -37,6 +37,7 @@ typedef struct Xen9pfsRing {
> 
>      struct iovec *sg;
>      QEMUBH *bh;
> +    Coroutine *co;
> 
>      /* local copies, so that we can read/write PDU data directly from
>       * the ring */
> @@ -198,16 +199,18 @@ static void xen_9pfs_init_in_iov_from_pdu(V9fsPDU
> *pdu, g_free(ring->sg);
> 
>      ring->sg = g_new0(struct iovec, 2);
> -    xen_9pfs_in_sg(ring, ring->sg, &num, pdu->idx, size);
> +    ring->co = qemu_coroutine_self();
> +    smp_wmb();
> 
> +again:
> +    xen_9pfs_in_sg(ring, ring->sg, &num, pdu->idx, size);
>      buf_size = iov_size(ring->sg, num);
>      if (buf_size  < size) {
> -        xen_pv_printf(&xen_9pfs->xendev, 0, "Xen 9pfs request type %d"
> -                "needs %zu bytes, buffer has %zu\n", pdu->id, size,
> -                buf_size);
> -        xen_be_set_state(&xen_9pfs->xendev, XenbusStateClosing);
> -        xen_9pfs_disconnect(&xen_9pfs->xendev);
> +        qemu_coroutine_yield();
> +        goto again;
>      }
> +    ring->co = NULL;
> +    smp_wmb();
> 
>      *piov = ring->sg;
>      *pniov = num;
> @@ -292,6 +295,19 @@ static int xen_9pfs_receive(Xen9pfsRing *ring)
>  static void xen_9pfs_bh(void *opaque)
>  {
>      Xen9pfsRing *ring = opaque;
> +    bool wait;
> +
> +again:
> +    wait = ring->co != NULL && qemu_coroutine_entered(ring->co);
> +    smp_rmb();
> +    if (wait) {
> +        cpu_relax();
> +        goto again;
> +    }
> +
> +    if (ring->co != NULL) {
> +        qemu_coroutine_enter_if_inactive(ring->co);

... correct me if I am wrong, but AFAIK qemu_coroutine_enter_if_inactive() 
will simply run the coroutine directly on caller's thread, it will not 
dispatch the coroutine onto the thread which yielded the coroutine before.

> +    }
>      xen_9pfs_receive(ring);
>  }

AFAICS you have not addressed the problem msize >> xen ringbuffer size, in 
which case I would expect the Xen driver to loop forever. Am I missing 
something or have you postponed addressing this?

Best regards,
Christian Schoenebeck
Stefano Stabellini May 20, 2020, 5:35 p.m. UTC | #2
On Wed, 20 May 2020, Christian Schoenebeck wrote:
> On Mittwoch, 20. Mai 2020 03:47:12 CEST Stefano Stabellini wrote:
> > From: Stefano Stabellini <stefano.stabellini@xilinx.com>
> > 
> > Instead of truncating replies, which is problematic, wait until the
> > client reads more data and frees bytes on the reply ring.
> > 
> > Do that by calling qemu_coroutine_yield(). The corresponding
> > qemu_coroutine_enter_if_inactive() is called from xen_9pfs_bh upon
> > receiving the next notification from the client.
> > 
> > We need to be careful to avoid races in case xen_9pfs_bh and the
> > coroutine are both active at the same time. In xen_9pfs_bh, wait until
> > either the critical section is over (ring->co == NULL) or until the
> > coroutine becomes inactive (qemu_coroutine_yield() was called) before
> > continuing. Then, simply wake up the coroutine if it is inactive.
> > 
> > Signed-off-by: Stefano Stabellini <stefano.stabellini@xilinx.com>
> > ---
> 
> In general this patch makes sense to me, and much better and cleaner solution 
> than what we discussed before. Just one detail ...
> 
> >  hw/9pfs/xen-9p-backend.c | 28 ++++++++++++++++++++++------
> >  1 file changed, 22 insertions(+), 6 deletions(-)
> > 
> > diff --git a/hw/9pfs/xen-9p-backend.c b/hw/9pfs/xen-9p-backend.c
> > index fc197f6c8a..3939539028 100644
> > --- a/hw/9pfs/xen-9p-backend.c
> > +++ b/hw/9pfs/xen-9p-backend.c
> > @@ -37,6 +37,7 @@ typedef struct Xen9pfsRing {
> > 
> >      struct iovec *sg;
> >      QEMUBH *bh;
> > +    Coroutine *co;
> > 
> >      /* local copies, so that we can read/write PDU data directly from
> >       * the ring */
> > @@ -198,16 +199,18 @@ static void xen_9pfs_init_in_iov_from_pdu(V9fsPDU
> > *pdu, g_free(ring->sg);
> > 
> >      ring->sg = g_new0(struct iovec, 2);
> > -    xen_9pfs_in_sg(ring, ring->sg, &num, pdu->idx, size);
> > +    ring->co = qemu_coroutine_self();
> > +    smp_wmb();
> > 
> > +again:
> > +    xen_9pfs_in_sg(ring, ring->sg, &num, pdu->idx, size);
> >      buf_size = iov_size(ring->sg, num);
> >      if (buf_size  < size) {
> > -        xen_pv_printf(&xen_9pfs->xendev, 0, "Xen 9pfs request type %d"
> > -                "needs %zu bytes, buffer has %zu\n", pdu->id, size,
> > -                buf_size);
> > -        xen_be_set_state(&xen_9pfs->xendev, XenbusStateClosing);
> > -        xen_9pfs_disconnect(&xen_9pfs->xendev);
> > +        qemu_coroutine_yield();
> > +        goto again;
> >      }
> > +    ring->co = NULL;
> > +    smp_wmb();
> > 
> >      *piov = ring->sg;
> >      *pniov = num;
> > @@ -292,6 +295,19 @@ static int xen_9pfs_receive(Xen9pfsRing *ring)
> >  static void xen_9pfs_bh(void *opaque)
> >  {
> >      Xen9pfsRing *ring = opaque;
> > +    bool wait;
> > +
> > +again:
> > +    wait = ring->co != NULL && qemu_coroutine_entered(ring->co);
> > +    smp_rmb();
> > +    if (wait) {
> > +        cpu_relax();
> > +        goto again;
> > +    }
> > +
> > +    if (ring->co != NULL) {
> > +        qemu_coroutine_enter_if_inactive(ring->co);
> 
> ... correct me if I am wrong, but AFAIK qemu_coroutine_enter_if_inactive() 
> will simply run the coroutine directly on caller's thread, it will not 
> dispatch the coroutine onto the thread which yielded the coroutine before.

Yes, that is correct. I thought it would be fine because the caller here
is a bh function so it should have no problems entering the coroutine.

But I am not that much of an expert on coroutines... Do you think there
could be issues?


> > +    }
> >      xen_9pfs_receive(ring);
> >  }
> 
> AFAICS you have not addressed the problem msize >> xen ringbuffer size, in 
> which case I would expect the Xen driver to loop forever. Am I missing 
> something or have you postponed addressing this?

Yes, I postponed addressing that issue because of a couple of reasons.

For starter strictly speaking it should not be required: msize cannot be
bigger than the ring, but it can be equal to the ring increasing the
chances of having to wait in QEMU. It should still be correct but the
performance might not be great.

The other reason is that I already have the patches for both QEMU and
Linux, but I am seeing a strange error setting order = 10. Order = 9
works fine. I would like to do a bit more investigation before sending
those patches.
Stefano Stabellini May 20, 2020, 6:46 p.m. UTC | #3
On Wed, 20 May 2020, Stefano Stabellini wrote:
> On Wed, 20 May 2020, Christian Schoenebeck wrote:
> > On Mittwoch, 20. Mai 2020 03:47:12 CEST Stefano Stabellini wrote:
> > > From: Stefano Stabellini <stefano.stabellini@xilinx.com>
> > > 
> > > Instead of truncating replies, which is problematic, wait until the
> > > client reads more data and frees bytes on the reply ring.
> > > 
> > > Do that by calling qemu_coroutine_yield(). The corresponding
> > > qemu_coroutine_enter_if_inactive() is called from xen_9pfs_bh upon
> > > receiving the next notification from the client.
> > > 
> > > We need to be careful to avoid races in case xen_9pfs_bh and the
> > > coroutine are both active at the same time. In xen_9pfs_bh, wait until
> > > either the critical section is over (ring->co == NULL) or until the
> > > coroutine becomes inactive (qemu_coroutine_yield() was called) before
> > > continuing. Then, simply wake up the coroutine if it is inactive.
> > > 
> > > Signed-off-by: Stefano Stabellini <stefano.stabellini@xilinx.com>
> > > ---
> > 
> > In general this patch makes sense to me, and much better and cleaner solution 
> > than what we discussed before. Just one detail ...
> > 
> > >  hw/9pfs/xen-9p-backend.c | 28 ++++++++++++++++++++++------
> > >  1 file changed, 22 insertions(+), 6 deletions(-)
> > > 
> > > diff --git a/hw/9pfs/xen-9p-backend.c b/hw/9pfs/xen-9p-backend.c
> > > index fc197f6c8a..3939539028 100644
> > > --- a/hw/9pfs/xen-9p-backend.c
> > > +++ b/hw/9pfs/xen-9p-backend.c
> > > @@ -37,6 +37,7 @@ typedef struct Xen9pfsRing {
> > > 
> > >      struct iovec *sg;
> > >      QEMUBH *bh;
> > > +    Coroutine *co;
> > > 
> > >      /* local copies, so that we can read/write PDU data directly from
> > >       * the ring */
> > > @@ -198,16 +199,18 @@ static void xen_9pfs_init_in_iov_from_pdu(V9fsPDU
> > > *pdu, g_free(ring->sg);
> > > 
> > >      ring->sg = g_new0(struct iovec, 2);
> > > -    xen_9pfs_in_sg(ring, ring->sg, &num, pdu->idx, size);
> > > +    ring->co = qemu_coroutine_self();
> > > +    smp_wmb();
> > > 
> > > +again:
> > > +    xen_9pfs_in_sg(ring, ring->sg, &num, pdu->idx, size);
> > >      buf_size = iov_size(ring->sg, num);
> > >      if (buf_size  < size) {
> > > -        xen_pv_printf(&xen_9pfs->xendev, 0, "Xen 9pfs request type %d"
> > > -                "needs %zu bytes, buffer has %zu\n", pdu->id, size,
> > > -                buf_size);
> > > -        xen_be_set_state(&xen_9pfs->xendev, XenbusStateClosing);
> > > -        xen_9pfs_disconnect(&xen_9pfs->xendev);
> > > +        qemu_coroutine_yield();
> > > +        goto again;
> > >      }
> > > +    ring->co = NULL;
> > > +    smp_wmb();
> > > 
> > >      *piov = ring->sg;
> > >      *pniov = num;
> > > @@ -292,6 +295,19 @@ static int xen_9pfs_receive(Xen9pfsRing *ring)
> > >  static void xen_9pfs_bh(void *opaque)
> > >  {
> > >      Xen9pfsRing *ring = opaque;
> > > +    bool wait;
> > > +
> > > +again:
> > > +    wait = ring->co != NULL && qemu_coroutine_entered(ring->co);
> > > +    smp_rmb();
> > > +    if (wait) {
> > > +        cpu_relax();
> > > +        goto again;
> > > +    }
> > > +
> > > +    if (ring->co != NULL) {
> > > +        qemu_coroutine_enter_if_inactive(ring->co);
> > 
> > ... correct me if I am wrong, but AFAIK qemu_coroutine_enter_if_inactive() 
> > will simply run the coroutine directly on caller's thread, it will not 
> > dispatch the coroutine onto the thread which yielded the coroutine before.
> 
> Yes, that is correct. I thought it would be fine because the caller here
> is a bh function so it should have no problems entering the coroutine.
> 
> But I am not that much of an expert on coroutines... Do you think there
> could be issues?
> 
> 
> > > +    }
> > >      xen_9pfs_receive(ring);
> > >  }
> > 
> > AFAICS you have not addressed the problem msize >> xen ringbuffer size, in 
> > which case I would expect the Xen driver to loop forever. Am I missing 
> > something or have you postponed addressing this?
> 
> Yes, I postponed addressing that issue because of a couple of reasons.
> 
> For starter strictly speaking it should not be required: msize cannot be
> bigger than the ring, but it can be equal to the ring increasing the
> chances of having to wait in QEMU. It should still be correct but the
> performance might not be great.
> 
> The other reason is that I already have the patches for both QEMU and
> Linux, but I am seeing a strange error setting order = 10. Order = 9
> works fine. I would like to do a bit more investigation before sending
> those patches.

As it turns out, order 9 is the max that the protocol allows at the
moment, so I am going to add a patch increasing the max order supported
by QEMU to 9 in the next version of this series.

The msize limitation to 8 (order 9 - 1) is done at the Linux side
(https://marc.info/?l=linux-kernel&m=159000008631935&w=2).
Christian Schoenebeck May 21, 2020, 11:51 a.m. UTC | #4
On Mittwoch, 20. Mai 2020 19:35:00 CEST Stefano Stabellini wrote:
> On Wed, 20 May 2020, Christian Schoenebeck wrote:
> > On Mittwoch, 20. Mai 2020 03:47:12 CEST Stefano Stabellini wrote:
> > > From: Stefano Stabellini <stefano.stabellini@xilinx.com>
> > > 
> > > Instead of truncating replies, which is problematic, wait until the
> > > client reads more data and frees bytes on the reply ring.
> > > 
> > > Do that by calling qemu_coroutine_yield(). The corresponding
> > > qemu_coroutine_enter_if_inactive() is called from xen_9pfs_bh upon
> > > receiving the next notification from the client.
> > > 
> > > We need to be careful to avoid races in case xen_9pfs_bh and the
> > > coroutine are both active at the same time. In xen_9pfs_bh, wait until
> > > either the critical section is over (ring->co == NULL) or until the
> > > coroutine becomes inactive (qemu_coroutine_yield() was called) before
> > > continuing. Then, simply wake up the coroutine if it is inactive.
> > > 
> > > Signed-off-by: Stefano Stabellini <stefano.stabellini@xilinx.com>
> > > ---
> > 
> > In general this patch makes sense to me, and much better and cleaner
> > solution than what we discussed before. Just one detail ...
> > 
> > >  hw/9pfs/xen-9p-backend.c | 28 ++++++++++++++++++++++------
> > >  1 file changed, 22 insertions(+), 6 deletions(-)
> > > 
> > > diff --git a/hw/9pfs/xen-9p-backend.c b/hw/9pfs/xen-9p-backend.c
> > > index fc197f6c8a..3939539028 100644
> > > --- a/hw/9pfs/xen-9p-backend.c
> > > +++ b/hw/9pfs/xen-9p-backend.c
> > > @@ -37,6 +37,7 @@ typedef struct Xen9pfsRing {
> > > 
> > >      struct iovec *sg;
> > >      QEMUBH *bh;
> > > 
> > > +    Coroutine *co;
> > > 
> > >      /* local copies, so that we can read/write PDU data directly from
> > >      
> > >       * the ring */
> > > 
> > > @@ -198,16 +199,18 @@ static void xen_9pfs_init_in_iov_from_pdu(V9fsPDU
> > > *pdu, g_free(ring->sg);
> > > 
> > >      ring->sg = g_new0(struct iovec, 2);
> > > 
> > > -    xen_9pfs_in_sg(ring, ring->sg, &num, pdu->idx, size);
> > > +    ring->co = qemu_coroutine_self();
> > > +    smp_wmb();
> > > 
> > > +again:
> > > +    xen_9pfs_in_sg(ring, ring->sg, &num, pdu->idx, size);
> > > 
> > >      buf_size = iov_size(ring->sg, num);
> > >      if (buf_size  < size) {
> > > 
> > > -        xen_pv_printf(&xen_9pfs->xendev, 0, "Xen 9pfs request type %d"
> > > -                "needs %zu bytes, buffer has %zu\n", pdu->id, size,
> > > -                buf_size);
> > > -        xen_be_set_state(&xen_9pfs->xendev, XenbusStateClosing);
> > > -        xen_9pfs_disconnect(&xen_9pfs->xendev);
> > > +        qemu_coroutine_yield();
> > > +        goto again;
> > > 
> > >      }
> > > 
> > > +    ring->co = NULL;
> > > +    smp_wmb();
> > > 
> > >      *piov = ring->sg;
> > >      *pniov = num;
> > > 
> > > @@ -292,6 +295,19 @@ static int xen_9pfs_receive(Xen9pfsRing *ring)
> > > 
> > >  static void xen_9pfs_bh(void *opaque)
> > >  {
> > >  
> > >      Xen9pfsRing *ring = opaque;
> > > 
> > > +    bool wait;
> > > +
> > > +again:
> > > +    wait = ring->co != NULL && qemu_coroutine_entered(ring->co);
> > > +    smp_rmb();
> > > +    if (wait) {
> > > +        cpu_relax();
> > > +        goto again;
> > > +    }
> > > +
> > > +    if (ring->co != NULL) {
> > > +        qemu_coroutine_enter_if_inactive(ring->co);
> > 
> > ... correct me if I am wrong, but AFAIK qemu_coroutine_enter_if_inactive()
> > will simply run the coroutine directly on caller's thread, it will not
> > dispatch the coroutine onto the thread which yielded the coroutine before.
> 
> Yes, that is correct. I thought it would be fine because the caller here
> is a bh function so it should have no problems entering the coroutine.
> 
> But I am not that much of an expert on coroutines... Do you think there
> could be issues?

There is not something specific that would come to my mind right now, no. I 
just wanted to make sure you are aware on which thread that's going to be 
executed.

Remember we're all working on 9pfs only on a side channel, so nobody right now 
has a 100% code base coverage in his head. Most of the 9pfs code assumes to be 
run on main I/O thread only. On other projects I use to guard those functions 
like:

#define MAIN_THREAD_ONLY() \
	assert(isMainThread())

void foo() {
	MAIN_THREAD_ONLY();
	...
}

Might make sense to add something like that to 9pfs as well at some point.

> > > +    }
> > > 
> > >      xen_9pfs_receive(ring);
> > >  
> > >  }
> > 
> > AFAICS you have not addressed the problem msize >> xen ringbuffer size, in
> > which case I would expect the Xen driver to loop forever. Am I missing
> > something or have you postponed addressing this?
> 
> Yes, I postponed addressing that issue because of a couple of reasons.
> 
> For starter strictly speaking it should not be required: msize cannot be
> bigger than the ring, but it can be equal to the ring increasing the
> chances of having to wait in QEMU. It should still be correct but the
> performance might not be great.

Ah right, I actually mixed some things up on Xen side, my bad! You're right, 
the introduced loop should be sufficient to avoid misbehaviours now. So there 
would really just be a potential performance penalty, but that's more of a 
luxury issue that could be addressed in future.

Good then, maybe just add some comment on the memory barriers to shut up the 
code style warning as suggested by Greg, except of that ...

Reviewed-by: Christian Schoenebeck <qemu_oss@crudebyte.com>

Best regards,
Christian Schoenebeck
diff mbox series

Patch

diff --git a/hw/9pfs/xen-9p-backend.c b/hw/9pfs/xen-9p-backend.c
index fc197f6c8a..3939539028 100644
--- a/hw/9pfs/xen-9p-backend.c
+++ b/hw/9pfs/xen-9p-backend.c
@@ -37,6 +37,7 @@  typedef struct Xen9pfsRing {
 
     struct iovec *sg;
     QEMUBH *bh;
+    Coroutine *co;
 
     /* local copies, so that we can read/write PDU data directly from
      * the ring */
@@ -198,16 +199,18 @@  static void xen_9pfs_init_in_iov_from_pdu(V9fsPDU *pdu,
     g_free(ring->sg);
 
     ring->sg = g_new0(struct iovec, 2);
-    xen_9pfs_in_sg(ring, ring->sg, &num, pdu->idx, size);
+    ring->co = qemu_coroutine_self();
+    smp_wmb();
 
+again:
+    xen_9pfs_in_sg(ring, ring->sg, &num, pdu->idx, size);
     buf_size = iov_size(ring->sg, num);
     if (buf_size  < size) {
-        xen_pv_printf(&xen_9pfs->xendev, 0, "Xen 9pfs request type %d"
-                "needs %zu bytes, buffer has %zu\n", pdu->id, size,
-                buf_size);
-        xen_be_set_state(&xen_9pfs->xendev, XenbusStateClosing);
-        xen_9pfs_disconnect(&xen_9pfs->xendev);
+        qemu_coroutine_yield();
+        goto again;
     }
+    ring->co = NULL;
+    smp_wmb();
 
     *piov = ring->sg;
     *pniov = num;
@@ -292,6 +295,19 @@  static int xen_9pfs_receive(Xen9pfsRing *ring)
 static void xen_9pfs_bh(void *opaque)
 {
     Xen9pfsRing *ring = opaque;
+    bool wait;
+
+again:
+    wait = ring->co != NULL && qemu_coroutine_entered(ring->co);
+    smp_rmb();
+    if (wait) {
+        cpu_relax();
+        goto again;
+    }
+
+    if (ring->co != NULL) {
+        qemu_coroutine_enter_if_inactive(ring->co);
+    }
     xen_9pfs_receive(ring);
 }