diff mbox series

[6/6] nbd/server: introduce NBDClient->lock to protect fields

Message ID 20231221014903.1537962-7-stefanha@redhat.com (mailing list archive)
State New, archived
Headers show
Series qemu-iotests fixes for Kevin's block tree | expand

Commit Message

Stefan Hajnoczi Dec. 21, 2023, 1:49 a.m. UTC
NBDClient has a number of fields that are accessed by both the export
AioContext and the main loop thread. When the AioContext lock is removed
these fields will need another form of protection.

Add NBDClient->lock and protect fields that are accessed by both
threads. Also add assertions where possible and otherwise add doc
comments stating assumptions about which thread and lock holding.

Note this patch moves the client->recv_coroutine assertion from
nbd_co_receive_request() to nbd_trip() where client->lock is held.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 nbd/server.c | 128 +++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 94 insertions(+), 34 deletions(-)

Comments

Paolo Bonzini Dec. 21, 2023, 7:26 a.m. UTC | #1
On 12/21/23 02:49, Stefan Hajnoczi wrote:
>       nbd_client_receive_next_request(client);
> +
> +    qemu_mutex_unlock(&client->lock);
> +
>       if (ret == -EIO) {
>           goto disconnect;
>       }

I think I slightly prefer if disconnect is reached with lock taken, for 
consistency with the "done" label.  It does not complicate the code,
because you can just move qio_channel_set_cork() and replace:

> @@ -3024,8 +3072,10 @@ static coroutine_fn void nbd_trip(void *opaque)
>       }
>   
>       qio_channel_set_cork(client->ioc, false);
> +    qemu_mutex_lock(&client->lock);

with:

+    qio_channel_set_cork(client->ioc, false);
+    qemu_mutex_lock(&client->lock);

      if (ret < 0) {
          error_prepend(&local_err, "Failed to send reply: ");
          goto disconnect;
      }

      /*
       * We must disconnect after NBD_CMD_WRITE or BLOCK_STATUS with
       * payload if we did not read the payload.
       */
      if (!req->complete) {
          error_setg(&local_err, "Request handling failed in 
intermediate state");
          goto disconnect;
      }
-    qio_channel_set_cork(client->ioc, false);
  done:

Thanks,

Paolo

>   done:
>       nbd_request_put(req);
> +    qemu_mutex_unlock(&client->lock);
Kevin Wolf Dec. 21, 2023, 10:45 a.m. UTC | #2
Am 21.12.2023 um 02:49 hat Stefan Hajnoczi geschrieben:
> NBDClient has a number of fields that are accessed by both the export
> AioContext and the main loop thread. When the AioContext lock is removed
> these fields will need another form of protection.
> 
> Add NBDClient->lock and protect fields that are accessed by both
> threads. Also add assertions where possible and otherwise add doc
> comments stating assumptions about which thread and lock holding.
> 
> Note this patch moves the client->recv_coroutine assertion from
> nbd_co_receive_request() to nbd_trip() where client->lock is held.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  nbd/server.c | 128 +++++++++++++++++++++++++++++++++++++--------------
>  1 file changed, 94 insertions(+), 34 deletions(-)
> 
> diff --git a/nbd/server.c b/nbd/server.c
> index 527fbdab4a..4008ec7df9 100644
> --- a/nbd/server.c
> +++ b/nbd/server.c
> @@ -125,23 +125,25 @@ struct NBDClient {
>      int refcount; /* atomic */
>      void (*close_fn)(NBDClient *client, bool negotiated);
>  
> +    QemuMutex lock;
> +
>      NBDExport *exp;
>      QCryptoTLSCreds *tlscreds;
>      char *tlsauthz;
>      QIOChannelSocket *sioc; /* The underlying data channel */
>      QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
>  
> -    Coroutine *recv_coroutine;
> +    Coroutine *recv_coroutine; /* protected by lock */
>  
>      CoMutex send_lock;
>      Coroutine *send_coroutine;
>  
> -    bool read_yielding;
> -    bool quiescing;
> +    bool read_yielding; /* protected by lock */
> +    bool quiescing; /* protected by lock */
>  
>      QTAILQ_ENTRY(NBDClient) next;
> -    int nb_requests;
> -    bool closing;
> +    int nb_requests; /* protected by lock */
> +    bool closing; /* protected by lock */
>  
>      uint32_t check_align; /* If non-zero, check for aligned client requests */
>  
> @@ -1415,11 +1417,18 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
>  
>          len = qio_channel_readv(client->ioc, &iov, 1, errp);
>          if (len == QIO_CHANNEL_ERR_BLOCK) {
> -            client->read_yielding = true;
> +            WITH_QEMU_LOCK_GUARD(&client->lock) {
> +                if (client->quiescing) {
> +                    return -EAGAIN;
> +                }

Why did you add another client->quiescing check here?

If it is to address a race, I think you only made the window a bit
smaller, but between releasing the lock and yielding the field could
still change, so drain needs to handle this case anyway.

> +                client->read_yielding = true;
> +            }
>              qio_channel_yield(client->ioc, G_IO_IN);
> -            client->read_yielding = false;
> -            if (client->quiescing) {
> -                return -EAGAIN;
> +            WITH_QEMU_LOCK_GUARD(&client->lock) {
> +                client->read_yielding = false;
> +                if (client->quiescing) {
> +                    return -EAGAIN;
> +                }
>              }
>              continue;
>          } else if (len < 0) {
> @@ -1528,6 +1537,7 @@ void nbd_client_put(NBDClient *client)
>              blk_exp_unref(&client->exp->common);
>          }
>          g_free(client->contexts.bitmaps);
> +        qemu_mutex_destroy(&client->lock);
>          g_free(client);
>      }
>  }
> @@ -1536,11 +1546,13 @@ static void client_close(NBDClient *client, bool negotiated)
>  {
>      assert(qemu_in_main_thread());
>  
> -    if (client->closing) {
> -        return;
> -    }
> +    WITH_QEMU_LOCK_GUARD(&client->lock) {
> +        if (client->closing) {
> +            return;
> +        }
>  
> -    client->closing = true;
> +        client->closing = true;
> +    }
>  
>      /* Force requests to finish.  They will drop their own references,
>       * then we'll close the socket and free the NBDClient.
> @@ -1554,6 +1566,7 @@ static void client_close(NBDClient *client, bool negotiated)
>      }
>  }
>  
> +/* Runs in export AioContext with client->lock held */
>  static NBDRequestData *nbd_request_get(NBDClient *client)
>  {
>      NBDRequestData *req;
> @@ -1566,6 +1579,7 @@ static NBDRequestData *nbd_request_get(NBDClient *client)
>      return req;
>  }
>  
> +/* Runs in export AioContext with client->lock held */
>  static void nbd_request_put(NBDRequestData *req)
>  {
>      NBDClient *client = req->client;
> @@ -1589,14 +1603,18 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
>      NBDExport *exp = opaque;
>      NBDClient *client;
>  
> +    assert(qemu_in_main_thread());
> +
>      trace_nbd_blk_aio_attached(exp->name, ctx);
>  
>      exp->common.ctx = ctx;
>  
>      QTAILQ_FOREACH(client, &exp->clients, next) {
> -        assert(client->nb_requests == 0);
> -        assert(client->recv_coroutine == NULL);
> -        assert(client->send_coroutine == NULL);
> +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> +            assert(client->nb_requests == 0);
> +            assert(client->recv_coroutine == NULL);
> +            assert(client->send_coroutine == NULL);
> +        }
>      }
>  }
>  
> @@ -1604,6 +1622,8 @@ static void blk_aio_detach(void *opaque)
>  {
>      NBDExport *exp = opaque;
>  
> +    assert(qemu_in_main_thread());
> +
>      trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
>  
>      exp->common.ctx = NULL;
> @@ -1614,8 +1634,12 @@ static void nbd_drained_begin(void *opaque)
>      NBDExport *exp = opaque;
>      NBDClient *client;
>  
> +    assert(qemu_in_main_thread());
> +
>      QTAILQ_FOREACH(client, &exp->clients, next) {
> -        client->quiescing = true;
> +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> +            client->quiescing = true;
> +        }
>      }
>  }
>  
> @@ -1624,9 +1648,13 @@ static void nbd_drained_end(void *opaque)
>      NBDExport *exp = opaque;
>      NBDClient *client;
>  
> +    assert(qemu_in_main_thread());
> +
>      QTAILQ_FOREACH(client, &exp->clients, next) {
> -        client->quiescing = false;
> -        nbd_client_receive_next_request(client);
> +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> +            client->quiescing = false;
> +            nbd_client_receive_next_request(client);
> +        }
>      }
>  }
>  
> @@ -1635,17 +1663,21 @@ static bool nbd_drained_poll(void *opaque)
>      NBDExport *exp = opaque;
>      NBDClient *client;
>  
> +    assert(qemu_in_main_thread());
> +
>      QTAILQ_FOREACH(client, &exp->clients, next) {
> -        if (client->nb_requests != 0) {
> -            /*
> -             * If there's a coroutine waiting for a request on nbd_read_eof()
> -             * enter it here so we don't depend on the client to wake it up.
> -             */
> -            if (client->recv_coroutine != NULL && client->read_yielding) {
> -                qio_channel_wake_read(client->ioc);
> +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> +            if (client->nb_requests != 0) {
> +                /*
> +                 * If there's a coroutine waiting for a request on nbd_read_eof()
> +                 * enter it here so we don't depend on the client to wake it up.
> +                 */
> +                if (client->recv_coroutine != NULL && client->read_yielding) {
> +                    qio_channel_wake_read(client->ioc);
> +                }

This is where the race from above becomes relevant.

Let's first look at calling qio_channel_wake_read() a tiny bit too
early: Without any locking in qio_channel_yield(), we could catch the
read coroutine between setting ioc->read_coroutine and before actually
yielding. In this case we would call aio_co_wake() on a coroutine that
is still running in a different thread. Since it's in a different
thread, we only schedule it instead entering it directly, and that just
works. The coroutine will immediately be reentered, which is exactly
what we want.

Even earlier calls of qio_channel_wake_read() (i.e. between setting
client->read_yielding and setting ioc->read_coroutine) don't actively
hurt, they just don't do anything if no read is in flight. (This is the
same case as if the nbd_trip() coroutine didn't even set
client->read_yielding yet, just that the check you added above can't
catch it.)

So if nbd_read_eof() didn't yield yet, we don't wake it here, but we
still return true, so the next drained_poll call will try again.

This is good in principle, but it depends on waking up the main thread
when we made progress. So we have to call aio_wait_kick() between
setting ioc->read_coroutine and yielding to make this work. What we
actually may get indirectly is an aio_notify() through setting FD
handlers if all implementations of qio_channel_set_aio_fd_handler()
actually do that. I suppose this could be enough?

Anyway, if my result after thinking really hard about this is "I can't
rule out that it's correct", maybe it would be better to just run this
code in the export AioContext instead so that we don't have to think
about all the subtleties and know that the nbd_co_trip() coroutine is at
a yield point?

> +
> +                return true;
>              }
> -
> -            return true;
>          }
>      }
>  
> @@ -1656,6 +1688,8 @@ static void nbd_eject_notifier(Notifier *n, void *data)
>  {
>      NBDExport *exp = container_of(n, NBDExport, eject_notifier);
>  
> +    assert(qemu_in_main_thread());
> +
>      blk_exp_request_shutdown(&exp->common);
>  }
>  
> @@ -2541,7 +2575,6 @@ static int coroutine_fn nbd_co_receive_request(NBDRequestData *req,
>      int ret;
>  
>      g_assert(qemu_in_coroutine());
> -    assert(client->recv_coroutine == qemu_coroutine_self());
>      ret = nbd_receive_request(client, request, errp);
>      if (ret < 0) {
>          return ret;
> @@ -2950,7 +2983,11 @@ static coroutine_fn void nbd_trip(void *opaque)
>       */
>  
>      trace_nbd_trip();
> +
> +    qemu_mutex_lock(&client->lock);
> +
>      if (client->closing) {
> +        qemu_mutex_unlock(&client->lock);
>          aio_co_reschedule_self(qemu_get_aio_context());
>          nbd_client_put(client);
>          return;
> @@ -2961,15 +2998,24 @@ static coroutine_fn void nbd_trip(void *opaque)
>           * We're switching between AIO contexts. Don't attempt to receive a new
>           * request and kick the main context which may be waiting for us.
>           */
> -        aio_co_reschedule_self(qemu_get_aio_context());
> -        nbd_client_put(client);
>          client->recv_coroutine = NULL;
> +        qemu_mutex_unlock(&client->lock);
>          aio_wait_kick();
> +
> +        aio_co_reschedule_self(qemu_get_aio_context());
> +        nbd_client_put(client);
>          return;
>      }
>  
>      req = nbd_request_get(client);
> -    ret = nbd_co_receive_request(req, &request, &local_err);
> +
> +    do {
> +        assert(client->recv_coroutine == qemu_coroutine_self());
> +        qemu_mutex_unlock(&client->lock);
> +        ret = nbd_co_receive_request(req, &request, &local_err);
> +        qemu_mutex_lock(&client->lock);
> +    } while (ret == -EAGAIN && !client->quiescing);

I think this deserves a comment to say that the loop is only about the
drain case without polling where drained_end has already happened before
we reach this point, so we may not terminate the coroutine any more
because nothing would restart it.

>      client->recv_coroutine = NULL;

As soon as we're past this, the nbd_client_receive_next_request() called
by drained_end will create a new coroutine, so we don't have to be
careful about the same case after this.

Kevin
Stefan Hajnoczi Dec. 21, 2023, 11:56 a.m. UTC | #3
On Thu, Dec 21, 2023 at 08:26:58AM +0100, Paolo Bonzini wrote:
> On 12/21/23 02:49, Stefan Hajnoczi wrote:
> >       nbd_client_receive_next_request(client);
> > +
> > +    qemu_mutex_unlock(&client->lock);
> > +
> >       if (ret == -EIO) {
> >           goto disconnect;
> >       }
> 
> I think I slightly prefer if disconnect is reached with lock taken, for
> consistency with the "done" label.  It does not complicate the code,
> because you can just move qio_channel_set_cork() and replace:

Yes, that makes the code easier to follow. Will fix in v2.

Stefan
Stefan Hajnoczi Dec. 21, 2023, 2:14 p.m. UTC | #4
On Thu, Dec 21, 2023 at 11:45:36AM +0100, Kevin Wolf wrote:
> Am 21.12.2023 um 02:49 hat Stefan Hajnoczi geschrieben:
> > NBDClient has a number of fields that are accessed by both the export
> > AioContext and the main loop thread. When the AioContext lock is removed
> > these fields will need another form of protection.
> > 
> > Add NBDClient->lock and protect fields that are accessed by both
> > threads. Also add assertions where possible and otherwise add doc
> > comments stating assumptions about which thread and lock holding.
> > 
> > Note this patch moves the client->recv_coroutine assertion from
> > nbd_co_receive_request() to nbd_trip() where client->lock is held.
> > 
> > Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> > ---
> >  nbd/server.c | 128 +++++++++++++++++++++++++++++++++++++--------------
> >  1 file changed, 94 insertions(+), 34 deletions(-)
> > 
> > diff --git a/nbd/server.c b/nbd/server.c
> > index 527fbdab4a..4008ec7df9 100644
> > --- a/nbd/server.c
> > +++ b/nbd/server.c
> > @@ -125,23 +125,25 @@ struct NBDClient {
> >      int refcount; /* atomic */
> >      void (*close_fn)(NBDClient *client, bool negotiated);
> >  
> > +    QemuMutex lock;
> > +
> >      NBDExport *exp;
> >      QCryptoTLSCreds *tlscreds;
> >      char *tlsauthz;
> >      QIOChannelSocket *sioc; /* The underlying data channel */
> >      QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
> >  
> > -    Coroutine *recv_coroutine;
> > +    Coroutine *recv_coroutine; /* protected by lock */
> >  
> >      CoMutex send_lock;
> >      Coroutine *send_coroutine;
> >  
> > -    bool read_yielding;
> > -    bool quiescing;
> > +    bool read_yielding; /* protected by lock */
> > +    bool quiescing; /* protected by lock */
> >  
> >      QTAILQ_ENTRY(NBDClient) next;
> > -    int nb_requests;
> > -    bool closing;
> > +    int nb_requests; /* protected by lock */
> > +    bool closing; /* protected by lock */
> >  
> >      uint32_t check_align; /* If non-zero, check for aligned client requests */
> >  
> > @@ -1415,11 +1417,18 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
> >  
> >          len = qio_channel_readv(client->ioc, &iov, 1, errp);
> >          if (len == QIO_CHANNEL_ERR_BLOCK) {
> > -            client->read_yielding = true;
> > +            WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +                if (client->quiescing) {
> > +                    return -EAGAIN;
> > +                }
> 
> Why did you add another client->quiescing check here?
> 
> If it is to address a race, I think you only made the window a bit
> smaller, but between releasing the lock and yielding the field could
> still change, so drain needs to handle this case anyway.

I added it for consistency/symmetry where nbd_trip() checks
client->quiescing after acquiring client->lock but didn't have any
specific scenario in mind. I'll drop this.

I agree that it does not prevent races. .drained_begin() +
.drained_poll() can run after client->lock is released and before
qio_channel_yield() takes effect. In that case we miss client->quiescing
and still have the race where no wake occurs because
qio_channel_wake_read() sees ioc->read_coroutine == NULL.

> > +                client->read_yielding = true;
> > +            }
> >              qio_channel_yield(client->ioc, G_IO_IN);
> > -            client->read_yielding = false;
> > -            if (client->quiescing) {
> > -                return -EAGAIN;
> > +            WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +                client->read_yielding = false;
> > +                if (client->quiescing) {
> > +                    return -EAGAIN;
> > +                }
> >              }
> >              continue;
> >          } else if (len < 0) {
> > @@ -1528,6 +1537,7 @@ void nbd_client_put(NBDClient *client)
> >              blk_exp_unref(&client->exp->common);
> >          }
> >          g_free(client->contexts.bitmaps);
> > +        qemu_mutex_destroy(&client->lock);
> >          g_free(client);
> >      }
> >  }
> > @@ -1536,11 +1546,13 @@ static void client_close(NBDClient *client, bool negotiated)
> >  {
> >      assert(qemu_in_main_thread());
> >  
> > -    if (client->closing) {
> > -        return;
> > -    }
> > +    WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +        if (client->closing) {
> > +            return;
> > +        }
> >  
> > -    client->closing = true;
> > +        client->closing = true;
> > +    }
> >  
> >      /* Force requests to finish.  They will drop their own references,
> >       * then we'll close the socket and free the NBDClient.
> > @@ -1554,6 +1566,7 @@ static void client_close(NBDClient *client, bool negotiated)
> >      }
> >  }
> >  
> > +/* Runs in export AioContext with client->lock held */
> >  static NBDRequestData *nbd_request_get(NBDClient *client)
> >  {
> >      NBDRequestData *req;
> > @@ -1566,6 +1579,7 @@ static NBDRequestData *nbd_request_get(NBDClient *client)
> >      return req;
> >  }
> >  
> > +/* Runs in export AioContext with client->lock held */
> >  static void nbd_request_put(NBDRequestData *req)
> >  {
> >      NBDClient *client = req->client;
> > @@ -1589,14 +1603,18 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
> >      NBDExport *exp = opaque;
> >      NBDClient *client;
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      trace_nbd_blk_aio_attached(exp->name, ctx);
> >  
> >      exp->common.ctx = ctx;
> >  
> >      QTAILQ_FOREACH(client, &exp->clients, next) {
> > -        assert(client->nb_requests == 0);
> > -        assert(client->recv_coroutine == NULL);
> > -        assert(client->send_coroutine == NULL);
> > +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +            assert(client->nb_requests == 0);
> > +            assert(client->recv_coroutine == NULL);
> > +            assert(client->send_coroutine == NULL);
> > +        }
> >      }
> >  }
> >  
> > @@ -1604,6 +1622,8 @@ static void blk_aio_detach(void *opaque)
> >  {
> >      NBDExport *exp = opaque;
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
> >  
> >      exp->common.ctx = NULL;
> > @@ -1614,8 +1634,12 @@ static void nbd_drained_begin(void *opaque)
> >      NBDExport *exp = opaque;
> >      NBDClient *client;
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      QTAILQ_FOREACH(client, &exp->clients, next) {
> > -        client->quiescing = true;
> > +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +            client->quiescing = true;
> > +        }
> >      }
> >  }
> >  
> > @@ -1624,9 +1648,13 @@ static void nbd_drained_end(void *opaque)
> >      NBDExport *exp = opaque;
> >      NBDClient *client;
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      QTAILQ_FOREACH(client, &exp->clients, next) {
> > -        client->quiescing = false;
> > -        nbd_client_receive_next_request(client);
> > +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +            client->quiescing = false;
> > +            nbd_client_receive_next_request(client);
> > +        }
> >      }
> >  }
> >  
> > @@ -1635,17 +1663,21 @@ static bool nbd_drained_poll(void *opaque)
> >      NBDExport *exp = opaque;
> >      NBDClient *client;
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      QTAILQ_FOREACH(client, &exp->clients, next) {
> > -        if (client->nb_requests != 0) {
> > -            /*
> > -             * If there's a coroutine waiting for a request on nbd_read_eof()
> > -             * enter it here so we don't depend on the client to wake it up.
> > -             */
> > -            if (client->recv_coroutine != NULL && client->read_yielding) {
> > -                qio_channel_wake_read(client->ioc);
> > +        WITH_QEMU_LOCK_GUARD(&client->lock) {
> > +            if (client->nb_requests != 0) {
> > +                /*
> > +                 * If there's a coroutine waiting for a request on nbd_read_eof()
> > +                 * enter it here so we don't depend on the client to wake it up.
> > +                 */
> > +                if (client->recv_coroutine != NULL && client->read_yielding) {
> > +                    qio_channel_wake_read(client->ioc);
> > +                }
> 
> This is where the race from above becomes relevant.
> 
> Let's first look at calling qio_channel_wake_read() a tiny bit too
> early: Without any locking in qio_channel_yield(), we could catch the
> read coroutine between setting ioc->read_coroutine and before actually
> yielding. In this case we would call aio_co_wake() on a coroutine that
> is still running in a different thread. Since it's in a different
> thread, we only schedule it instead entering it directly, and that just
> works. The coroutine will immediately be reentered, which is exactly
> what we want.
> 
> Even earlier calls of qio_channel_wake_read() (i.e. between setting
> client->read_yielding and setting ioc->read_coroutine) don't actively
> hurt, they just don't do anything if no read is in flight. (This is the
> same case as if the nbd_trip() coroutine didn't even set
> client->read_yielding yet, just that the check you added above can't
> catch it.)
> 
> So if nbd_read_eof() didn't yield yet, we don't wake it here, but we
> still return true, so the next drained_poll call will try again.
> 
> This is good in principle, but it depends on waking up the main thread
> when we made progress. So we have to call aio_wait_kick() between
> setting ioc->read_coroutine and yielding to make this work. What we
> actually may get indirectly is an aio_notify() through setting FD
> handlers if all implementations of qio_channel_set_aio_fd_handler()
> actually do that. I suppose this could be enough?

qio_channel_set_aio_fd_handler() calls aio_notify() on the export's
AioContext. It does not wake the main loop AioContext when an IOThread
is being used so I don't think it helps here.

The state where qio_channel_wake_read() misses that nbd_trip() is
yielding looks like this:

client->nb_requests > 0
client->recv_coroutine = nbd_trip() coroutine
client->quiescing = true
client->read_yielding = true
ioc->read_coroutine = NULL

The main loop thread is waiting for activity and nbd_trip() enters
qemu_coroutine_yield(). There is no progress until the main loop thread
resumes (which can be triggered by the export AioContext completing NBD
I/O too).

I guess the race isn't immediately apparent because there is usually
some event loop activity that hides the problem.

> Anyway, if my result after thinking really hard about this is "I can't
> rule out that it's correct", maybe it would be better to just run this
> code in the export AioContext instead so that we don't have to think
> about all the subtleties and know that the nbd_co_trip() coroutine is at
> a yield point?

Agreed.

> > +
> > +                return true;
> >              }
> > -
> > -            return true;
> >          }
> >      }
> >  
> > @@ -1656,6 +1688,8 @@ static void nbd_eject_notifier(Notifier *n, void *data)
> >  {
> >      NBDExport *exp = container_of(n, NBDExport, eject_notifier);
> >  
> > +    assert(qemu_in_main_thread());
> > +
> >      blk_exp_request_shutdown(&exp->common);
> >  }
> >  
> > @@ -2541,7 +2575,6 @@ static int coroutine_fn nbd_co_receive_request(NBDRequestData *req,
> >      int ret;
> >  
> >      g_assert(qemu_in_coroutine());
> > -    assert(client->recv_coroutine == qemu_coroutine_self());
> >      ret = nbd_receive_request(client, request, errp);
> >      if (ret < 0) {
> >          return ret;
> > @@ -2950,7 +2983,11 @@ static coroutine_fn void nbd_trip(void *opaque)
> >       */
> >  
> >      trace_nbd_trip();
> > +
> > +    qemu_mutex_lock(&client->lock);
> > +
> >      if (client->closing) {
> > +        qemu_mutex_unlock(&client->lock);
> >          aio_co_reschedule_self(qemu_get_aio_context());
> >          nbd_client_put(client);
> >          return;
> > @@ -2961,15 +2998,24 @@ static coroutine_fn void nbd_trip(void *opaque)
> >           * We're switching between AIO contexts. Don't attempt to receive a new
> >           * request and kick the main context which may be waiting for us.
> >           */
> > -        aio_co_reschedule_self(qemu_get_aio_context());
> > -        nbd_client_put(client);
> >          client->recv_coroutine = NULL;
> > +        qemu_mutex_unlock(&client->lock);
> >          aio_wait_kick();
> > +
> > +        aio_co_reschedule_self(qemu_get_aio_context());
> > +        nbd_client_put(client);
> >          return;
> >      }
> >  
> >      req = nbd_request_get(client);
> > -    ret = nbd_co_receive_request(req, &request, &local_err);
> > +
> > +    do {
> > +        assert(client->recv_coroutine == qemu_coroutine_self());
> > +        qemu_mutex_unlock(&client->lock);
> > +        ret = nbd_co_receive_request(req, &request, &local_err);
> > +        qemu_mutex_lock(&client->lock);
> > +    } while (ret == -EAGAIN && !client->quiescing);
> 
> I think this deserves a comment to say that the loop is only about the
> drain case without polling where drained_end has already happened before
> we reach this point, so we may not terminate the coroutine any more
> because nothing would restart it.

Sounds good, I'll add a comment in the next revision.

> >      client->recv_coroutine = NULL;
> 
> As soon as we're past this, the nbd_client_receive_next_request() called
> by drained_end will create a new coroutine, so we don't have to be
> careful about the same case after this.
> 
> Kevin
>
diff mbox series

Patch

diff --git a/nbd/server.c b/nbd/server.c
index 527fbdab4a..4008ec7df9 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -125,23 +125,25 @@  struct NBDClient {
     int refcount; /* atomic */
     void (*close_fn)(NBDClient *client, bool negotiated);
 
+    QemuMutex lock;
+
     NBDExport *exp;
     QCryptoTLSCreds *tlscreds;
     char *tlsauthz;
     QIOChannelSocket *sioc; /* The underlying data channel */
     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
 
-    Coroutine *recv_coroutine;
+    Coroutine *recv_coroutine; /* protected by lock */
 
     CoMutex send_lock;
     Coroutine *send_coroutine;
 
-    bool read_yielding;
-    bool quiescing;
+    bool read_yielding; /* protected by lock */
+    bool quiescing; /* protected by lock */
 
     QTAILQ_ENTRY(NBDClient) next;
-    int nb_requests;
-    bool closing;
+    int nb_requests; /* protected by lock */
+    bool closing; /* protected by lock */
 
     uint32_t check_align; /* If non-zero, check for aligned client requests */
 
@@ -1415,11 +1417,18 @@  nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
 
         len = qio_channel_readv(client->ioc, &iov, 1, errp);
         if (len == QIO_CHANNEL_ERR_BLOCK) {
-            client->read_yielding = true;
+            WITH_QEMU_LOCK_GUARD(&client->lock) {
+                if (client->quiescing) {
+                    return -EAGAIN;
+                }
+                client->read_yielding = true;
+            }
             qio_channel_yield(client->ioc, G_IO_IN);
-            client->read_yielding = false;
-            if (client->quiescing) {
-                return -EAGAIN;
+            WITH_QEMU_LOCK_GUARD(&client->lock) {
+                client->read_yielding = false;
+                if (client->quiescing) {
+                    return -EAGAIN;
+                }
             }
             continue;
         } else if (len < 0) {
@@ -1528,6 +1537,7 @@  void nbd_client_put(NBDClient *client)
             blk_exp_unref(&client->exp->common);
         }
         g_free(client->contexts.bitmaps);
+        qemu_mutex_destroy(&client->lock);
         g_free(client);
     }
 }
@@ -1536,11 +1546,13 @@  static void client_close(NBDClient *client, bool negotiated)
 {
     assert(qemu_in_main_thread());
 
-    if (client->closing) {
-        return;
-    }
+    WITH_QEMU_LOCK_GUARD(&client->lock) {
+        if (client->closing) {
+            return;
+        }
 
-    client->closing = true;
+        client->closing = true;
+    }
 
     /* Force requests to finish.  They will drop their own references,
      * then we'll close the socket and free the NBDClient.
@@ -1554,6 +1566,7 @@  static void client_close(NBDClient *client, bool negotiated)
     }
 }
 
+/* Runs in export AioContext with client->lock held */
 static NBDRequestData *nbd_request_get(NBDClient *client)
 {
     NBDRequestData *req;
@@ -1566,6 +1579,7 @@  static NBDRequestData *nbd_request_get(NBDClient *client)
     return req;
 }
 
+/* Runs in export AioContext with client->lock held */
 static void nbd_request_put(NBDRequestData *req)
 {
     NBDClient *client = req->client;
@@ -1589,14 +1603,18 @@  static void blk_aio_attached(AioContext *ctx, void *opaque)
     NBDExport *exp = opaque;
     NBDClient *client;
 
+    assert(qemu_in_main_thread());
+
     trace_nbd_blk_aio_attached(exp->name, ctx);
 
     exp->common.ctx = ctx;
 
     QTAILQ_FOREACH(client, &exp->clients, next) {
-        assert(client->nb_requests == 0);
-        assert(client->recv_coroutine == NULL);
-        assert(client->send_coroutine == NULL);
+        WITH_QEMU_LOCK_GUARD(&client->lock) {
+            assert(client->nb_requests == 0);
+            assert(client->recv_coroutine == NULL);
+            assert(client->send_coroutine == NULL);
+        }
     }
 }
 
@@ -1604,6 +1622,8 @@  static void blk_aio_detach(void *opaque)
 {
     NBDExport *exp = opaque;
 
+    assert(qemu_in_main_thread());
+
     trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
 
     exp->common.ctx = NULL;
@@ -1614,8 +1634,12 @@  static void nbd_drained_begin(void *opaque)
     NBDExport *exp = opaque;
     NBDClient *client;
 
+    assert(qemu_in_main_thread());
+
     QTAILQ_FOREACH(client, &exp->clients, next) {
-        client->quiescing = true;
+        WITH_QEMU_LOCK_GUARD(&client->lock) {
+            client->quiescing = true;
+        }
     }
 }
 
@@ -1624,9 +1648,13 @@  static void nbd_drained_end(void *opaque)
     NBDExport *exp = opaque;
     NBDClient *client;
 
+    assert(qemu_in_main_thread());
+
     QTAILQ_FOREACH(client, &exp->clients, next) {
-        client->quiescing = false;
-        nbd_client_receive_next_request(client);
+        WITH_QEMU_LOCK_GUARD(&client->lock) {
+            client->quiescing = false;
+            nbd_client_receive_next_request(client);
+        }
     }
 }
 
@@ -1635,17 +1663,21 @@  static bool nbd_drained_poll(void *opaque)
     NBDExport *exp = opaque;
     NBDClient *client;
 
+    assert(qemu_in_main_thread());
+
     QTAILQ_FOREACH(client, &exp->clients, next) {
-        if (client->nb_requests != 0) {
-            /*
-             * If there's a coroutine waiting for a request on nbd_read_eof()
-             * enter it here so we don't depend on the client to wake it up.
-             */
-            if (client->recv_coroutine != NULL && client->read_yielding) {
-                qio_channel_wake_read(client->ioc);
+        WITH_QEMU_LOCK_GUARD(&client->lock) {
+            if (client->nb_requests != 0) {
+                /*
+                 * If there's a coroutine waiting for a request on nbd_read_eof()
+                 * enter it here so we don't depend on the client to wake it up.
+                 */
+                if (client->recv_coroutine != NULL && client->read_yielding) {
+                    qio_channel_wake_read(client->ioc);
+                }
+
+                return true;
             }
-
-            return true;
         }
     }
 
@@ -1656,6 +1688,8 @@  static void nbd_eject_notifier(Notifier *n, void *data)
 {
     NBDExport *exp = container_of(n, NBDExport, eject_notifier);
 
+    assert(qemu_in_main_thread());
+
     blk_exp_request_shutdown(&exp->common);
 }
 
@@ -2541,7 +2575,6 @@  static int coroutine_fn nbd_co_receive_request(NBDRequestData *req,
     int ret;
 
     g_assert(qemu_in_coroutine());
-    assert(client->recv_coroutine == qemu_coroutine_self());
     ret = nbd_receive_request(client, request, errp);
     if (ret < 0) {
         return ret;
@@ -2950,7 +2983,11 @@  static coroutine_fn void nbd_trip(void *opaque)
      */
 
     trace_nbd_trip();
+
+    qemu_mutex_lock(&client->lock);
+
     if (client->closing) {
+        qemu_mutex_unlock(&client->lock);
         aio_co_reschedule_self(qemu_get_aio_context());
         nbd_client_put(client);
         return;
@@ -2961,15 +2998,24 @@  static coroutine_fn void nbd_trip(void *opaque)
          * We're switching between AIO contexts. Don't attempt to receive a new
          * request and kick the main context which may be waiting for us.
          */
-        aio_co_reschedule_self(qemu_get_aio_context());
-        nbd_client_put(client);
         client->recv_coroutine = NULL;
+        qemu_mutex_unlock(&client->lock);
         aio_wait_kick();
+
+        aio_co_reschedule_self(qemu_get_aio_context());
+        nbd_client_put(client);
         return;
     }
 
     req = nbd_request_get(client);
-    ret = nbd_co_receive_request(req, &request, &local_err);
+
+    do {
+        assert(client->recv_coroutine == qemu_coroutine_self());
+        qemu_mutex_unlock(&client->lock);
+        ret = nbd_co_receive_request(req, &request, &local_err);
+        qemu_mutex_lock(&client->lock);
+    } while (ret == -EAGAIN && !client->quiescing);
+
     client->recv_coroutine = NULL;
 
     if (client->closing) {
@@ -2981,11 +3027,13 @@  static coroutine_fn void nbd_trip(void *opaque)
     }
 
     if (ret == -EAGAIN) {
-        assert(client->quiescing);
         goto done;
     }
 
     nbd_client_receive_next_request(client);
+
+    qemu_mutex_unlock(&client->lock);
+
     if (ret == -EIO) {
         goto disconnect;
     }
@@ -3024,8 +3072,10 @@  static coroutine_fn void nbd_trip(void *opaque)
     }
 
     qio_channel_set_cork(client->ioc, false);
+    qemu_mutex_lock(&client->lock);
 done:
     nbd_request_put(req);
+    qemu_mutex_unlock(&client->lock);
 
     aio_co_reschedule_self(qemu_get_aio_context());
     nbd_client_put(client);
@@ -3035,13 +3085,20 @@  disconnect:
     if (local_err) {
         error_reportf_err(local_err, "Disconnect client, due to: ");
     }
+
+    qemu_mutex_lock(&client->lock);
     nbd_request_put(req);
+    qemu_mutex_unlock(&client->lock);
 
     aio_co_reschedule_self(qemu_get_aio_context());
     client_close(client, true);
     nbd_client_put(client);
 }
 
+/*
+ * Runs in export AioContext and main loop thread. Caller must hold
+ * client->lock.
+ */
 static void nbd_client_receive_next_request(NBDClient *client)
 {
     if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS &&
@@ -3067,7 +3124,9 @@  static coroutine_fn void nbd_co_client_start(void *opaque)
         return;
     }
 
-    nbd_client_receive_next_request(client);
+    WITH_QEMU_LOCK_GUARD(&client->lock) {
+        nbd_client_receive_next_request(client);
+    }
 }
 
 /*
@@ -3084,6 +3143,7 @@  void nbd_client_new(QIOChannelSocket *sioc,
     Coroutine *co;
 
     client = g_new0(NBDClient, 1);
+    qemu_mutex_init(&client->lock);
     client->refcount = 1;
     client->tlscreds = tlscreds;
     if (tlscreds) {