diff mbox series

[2/2] nbd/server: Use drained block ops to quiesce the server

Message ID 20210601055728.90849-3-slp@redhat.com (mailing list archive)
State New, archived
Headers show
Series nbd/server: Quiesce server on drained section | expand

Commit Message

Sergio Lopez June 1, 2021, 5:57 a.m. UTC
Before switching between AioContexts we need to make sure that we're
fully quiesced ("nb_requests == 0" for every client) when entering the
drained section.

To do this, we set "quiescing = true" for every client on
".drained_begin" to prevent new coroutines to be created, and check if
"nb_requests == 0" on ".drained_poll". Finally, once we're exiting the
drained section, on ".drained_end" we set "quiescing = false" and
call "nbd_client_receive_next_request()" to resume the processing of
new requests.

With these changes, "blk_aio_attach()" and "blk_aio_detach()" can be
reverted to be as simple as they were before f148ae7d36.

RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1960137
Suggested-by: Kevin Wolf <kwolf@redhat.com>
Signed-off-by: Sergio Lopez <slp@redhat.com>
---
 nbd/server.c | 99 +++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 75 insertions(+), 24 deletions(-)

Comments

Kevin Wolf June 1, 2021, 4:08 p.m. UTC | #1
Am 01.06.2021 um 07:57 hat Sergio Lopez geschrieben:
> Before switching between AioContexts we need to make sure that we're
> fully quiesced ("nb_requests == 0" for every client) when entering the
> drained section.
> 
> To do this, we set "quiescing = true" for every client on
> ".drained_begin" to prevent new coroutines to be created, and check if
> "nb_requests == 0" on ".drained_poll". Finally, once we're exiting the
> drained section, on ".drained_end" we set "quiescing = false" and
> call "nbd_client_receive_next_request()" to resume the processing of
> new requests.
> 
> With these changes, "blk_aio_attach()" and "blk_aio_detach()" can be
> reverted to be as simple as they were before f148ae7d36.
> 
> RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1960137
> Suggested-by: Kevin Wolf <kwolf@redhat.com>
> Signed-off-by: Sergio Lopez <slp@redhat.com>
> ---
>  nbd/server.c | 99 +++++++++++++++++++++++++++++++++++++++-------------
>  1 file changed, 75 insertions(+), 24 deletions(-)
> 
> diff --git a/nbd/server.c b/nbd/server.c
> index 86a44a9b41..33e55479d7 100644
> --- a/nbd/server.c
> +++ b/nbd/server.c
> @@ -132,7 +132,7 @@ struct NBDClient {
>      CoMutex send_lock;
>      Coroutine *send_coroutine;
>  
> -    bool read_yielding;
> +    GSList *yield_co_list; /* List of coroutines yielding on nbd_read_eof */
>      bool quiescing;

Hm, how do you get more than one coroutine per client yielding in
nbd_read_eof() at the same time? I thought the model is that you always
have one coroutine reading the next request (which is
client->recv_coroutine) and all the others are just processing the
request they had read earlier. Multiple coroutines reading from the
same socket would sound like a bad idea.

>      QTAILQ_ENTRY(NBDClient) next;
> @@ -1367,6 +1367,7 @@ static inline int coroutine_fn
>  nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
>  {
>      bool partial = false;
> +    Coroutine *co;
>  
>      assert(size);
>      while (size > 0) {
> @@ -1375,9 +1376,12 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
>  
>          len = qio_channel_readv(client->ioc, &iov, 1, errp);
>          if (len == QIO_CHANNEL_ERR_BLOCK) {
> -            client->read_yielding = true;
> +            co = qemu_coroutine_self();
> +
> +            client->yield_co_list = g_slist_prepend(client->yield_co_list, co);
>              qio_channel_yield(client->ioc, G_IO_IN);
> -            client->read_yielding = false;
> +            client->yield_co_list = g_slist_remove(client->yield_co_list, co);
> +
>              if (client->quiescing) {
>                  return -EAGAIN;
>              }
> @@ -1513,6 +1517,11 @@ static void nbd_request_put(NBDRequestData *req)
>      g_free(req);
>  
>      client->nb_requests--;
> +
> +    if (client->quiescing && client->nb_requests == 0) {
> +        aio_wait_kick();
> +    }
> +
>      nbd_client_receive_next_request(client);
>  
>      nbd_client_put(client);
> @@ -1530,49 +1539,75 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
>      QTAILQ_FOREACH(client, &exp->clients, next) {
>          qio_channel_attach_aio_context(client->ioc, ctx);
>  
> +        assert(client->nb_requests == 0);
>          assert(client->recv_coroutine == NULL);
>          assert(client->send_coroutine == NULL);
> -
> -        if (client->quiescing) {
> -            client->quiescing = false;
> -            nbd_client_receive_next_request(client);
> -        }
>      }
>  }
>  
> -static void nbd_aio_detach_bh(void *opaque)
> +static void blk_aio_detach(void *opaque)
>  {
>      NBDExport *exp = opaque;
>      NBDClient *client;
>  
> +    trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
> +
>      QTAILQ_FOREACH(client, &exp->clients, next) {
>          qio_channel_detach_aio_context(client->ioc);
> +    }
> +
> +    exp->common.ctx = NULL;
> +}
> +
> +static void nbd_drained_begin(void *opaque)
> +{
> +    NBDExport *exp = opaque;
> +    NBDClient *client;
> +
> +    QTAILQ_FOREACH(client, &exp->clients, next) {
>          client->quiescing = true;
> +    }
> +}
>  
> -        if (client->recv_coroutine) {
> -            if (client->read_yielding) {
> -                qemu_aio_coroutine_enter(exp->common.ctx,
> -                                         client->recv_coroutine);
> -            } else {
> -                AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL);
> -            }
> -        }
> +static void nbd_drained_end(void *opaque)
> +{
> +    NBDExport *exp = opaque;
> +    NBDClient *client;
>  
> -        if (client->send_coroutine) {
> -            AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL);
> -        }
> +    QTAILQ_FOREACH(client, &exp->clients, next) {
> +        client->quiescing = false;
> +        nbd_client_receive_next_request(client);
>      }
>  }
>  
> -static void blk_aio_detach(void *opaque)
> +static bool nbd_drained_poll(void *opaque)
>  {
>      NBDExport *exp = opaque;
> +    NBDClient *client;
> +    Coroutine *co;
> +    GSList *entry;
> +    GSList *coroutine_list;
>  
> -    trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
> +    QTAILQ_FOREACH(client, &exp->clients, next) {
> +        if (client->nb_requests != 0) {
> +            /*
> +             * Enter coroutines waiting for new requests on nbd_read_eof(), so
> +             * we don't depend on the client to wake us up.
> +             */
> +            coroutine_list = g_slist_copy(client->yield_co_list);
> +            for (entry = coroutine_list;
> +                 entry != NULL;
> +                 entry = g_slist_next(entry)) {
> +                co = entry->data;
> +                qemu_aio_coroutine_enter(exp->common.ctx, co);
> +            }
> +            g_slist_free(coroutine_list);
>  
> -    aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp);
> +            return 1;

This would be more accurately spelt true...

> +        }
> +    }
>  
> -    exp->common.ctx = NULL;
> +    return 0;

...and this false.

>  }
>  
>  static void nbd_eject_notifier(Notifier *n, void *data)

The patch looks correct to me, though I'm not sure if yield_co_list is
an unnecessary complication (and if it isn't, whether that's safe).

I would be happy enough to apply it anyway if you can explain the
yield_co_list thing, but I'll give Eric some time to have a look, too.

Kevin
Sergio Lopez June 1, 2021, 4:31 p.m. UTC | #2
On Tue, Jun 01, 2021 at 06:08:41PM +0200, Kevin Wolf wrote:
> Am 01.06.2021 um 07:57 hat Sergio Lopez geschrieben:
> > Before switching between AioContexts we need to make sure that we're
> > fully quiesced ("nb_requests == 0" for every client) when entering the
> > drained section.
> > 
> > To do this, we set "quiescing = true" for every client on
> > ".drained_begin" to prevent new coroutines to be created, and check if
> > "nb_requests == 0" on ".drained_poll". Finally, once we're exiting the
> > drained section, on ".drained_end" we set "quiescing = false" and
> > call "nbd_client_receive_next_request()" to resume the processing of
> > new requests.
> > 
> > With these changes, "blk_aio_attach()" and "blk_aio_detach()" can be
> > reverted to be as simple as they were before f148ae7d36.
> > 
> > RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1960137
> > Suggested-by: Kevin Wolf <kwolf@redhat.com>
> > Signed-off-by: Sergio Lopez <slp@redhat.com>
> > ---
> >  nbd/server.c | 99 +++++++++++++++++++++++++++++++++++++++-------------
> >  1 file changed, 75 insertions(+), 24 deletions(-)
> > 
> > diff --git a/nbd/server.c b/nbd/server.c
> > index 86a44a9b41..33e55479d7 100644
> > --- a/nbd/server.c
> > +++ b/nbd/server.c
> > @@ -132,7 +132,7 @@ struct NBDClient {
> >      CoMutex send_lock;
> >      Coroutine *send_coroutine;
> >  
> > -    bool read_yielding;
> > +    GSList *yield_co_list; /* List of coroutines yielding on nbd_read_eof */
> >      bool quiescing;
> 
> Hm, how do you get more than one coroutine per client yielding in
> nbd_read_eof() at the same time? I thought the model is that you always
> have one coroutine reading the next request (which is
> client->recv_coroutine) and all the others are just processing the
> request they had read earlier. Multiple coroutines reading from the
> same socket would sound like a bad idea.

You're right, there's only a single coroutine yielding on
nbd_read_eof(). I've added the list while at a moment I was trying to
keep track of every coroutine, and I kept it without thinking if it
was really needed.

I'll drop it, entering just client->recv_coroutine is it isn't NULL.

> >      QTAILQ_ENTRY(NBDClient) next;
> > @@ -1367,6 +1367,7 @@ static inline int coroutine_fn
> >  nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
> >  {
> >      bool partial = false;
> > +    Coroutine *co;
> >  
> >      assert(size);
> >      while (size > 0) {
> > @@ -1375,9 +1376,12 @@ nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
> >  
> >          len = qio_channel_readv(client->ioc, &iov, 1, errp);
> >          if (len == QIO_CHANNEL_ERR_BLOCK) {
> > -            client->read_yielding = true;
> > +            co = qemu_coroutine_self();
> > +
> > +            client->yield_co_list = g_slist_prepend(client->yield_co_list, co);
> >              qio_channel_yield(client->ioc, G_IO_IN);
> > -            client->read_yielding = false;
> > +            client->yield_co_list = g_slist_remove(client->yield_co_list, co);
> > +
> >              if (client->quiescing) {
> >                  return -EAGAIN;
> >              }
> > @@ -1513,6 +1517,11 @@ static void nbd_request_put(NBDRequestData *req)
> >      g_free(req);
> >  
> >      client->nb_requests--;
> > +
> > +    if (client->quiescing && client->nb_requests == 0) {
> > +        aio_wait_kick();
> > +    }
> > +
> >      nbd_client_receive_next_request(client);
> >  
> >      nbd_client_put(client);
> > @@ -1530,49 +1539,75 @@ static void blk_aio_attached(AioContext *ctx, void *opaque)
> >      QTAILQ_FOREACH(client, &exp->clients, next) {
> >          qio_channel_attach_aio_context(client->ioc, ctx);
> >  
> > +        assert(client->nb_requests == 0);
> >          assert(client->recv_coroutine == NULL);
> >          assert(client->send_coroutine == NULL);
> > -
> > -        if (client->quiescing) {
> > -            client->quiescing = false;
> > -            nbd_client_receive_next_request(client);
> > -        }
> >      }
> >  }
> >  
> > -static void nbd_aio_detach_bh(void *opaque)
> > +static void blk_aio_detach(void *opaque)
> >  {
> >      NBDExport *exp = opaque;
> >      NBDClient *client;
> >  
> > +    trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
> > +
> >      QTAILQ_FOREACH(client, &exp->clients, next) {
> >          qio_channel_detach_aio_context(client->ioc);
> > +    }
> > +
> > +    exp->common.ctx = NULL;
> > +}
> > +
> > +static void nbd_drained_begin(void *opaque)
> > +{
> > +    NBDExport *exp = opaque;
> > +    NBDClient *client;
> > +
> > +    QTAILQ_FOREACH(client, &exp->clients, next) {
> >          client->quiescing = true;
> > +    }
> > +}
> >  
> > -        if (client->recv_coroutine) {
> > -            if (client->read_yielding) {
> > -                qemu_aio_coroutine_enter(exp->common.ctx,
> > -                                         client->recv_coroutine);
> > -            } else {
> > -                AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL);
> > -            }
> > -        }
> > +static void nbd_drained_end(void *opaque)
> > +{
> > +    NBDExport *exp = opaque;
> > +    NBDClient *client;
> >  
> > -        if (client->send_coroutine) {
> > -            AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL);
> > -        }
> > +    QTAILQ_FOREACH(client, &exp->clients, next) {
> > +        client->quiescing = false;
> > +        nbd_client_receive_next_request(client);
> >      }
> >  }
> >  
> > -static void blk_aio_detach(void *opaque)
> > +static bool nbd_drained_poll(void *opaque)
> >  {
> >      NBDExport *exp = opaque;
> > +    NBDClient *client;
> > +    Coroutine *co;
> > +    GSList *entry;
> > +    GSList *coroutine_list;
> >  
> > -    trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
> > +    QTAILQ_FOREACH(client, &exp->clients, next) {
> > +        if (client->nb_requests != 0) {
> > +            /*
> > +             * Enter coroutines waiting for new requests on nbd_read_eof(), so
> > +             * we don't depend on the client to wake us up.
> > +             */
> > +            coroutine_list = g_slist_copy(client->yield_co_list);
> > +            for (entry = coroutine_list;
> > +                 entry != NULL;
> > +                 entry = g_slist_next(entry)) {
> > +                co = entry->data;
> > +                qemu_aio_coroutine_enter(exp->common.ctx, co);
> > +            }
> > +            g_slist_free(coroutine_list);
> >  
> > -    aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp);
> > +            return 1;
> 
> This would be more accurately spelt true...
> 
> > +        }
> > +    }
> >  
> > -    exp->common.ctx = NULL;
> > +    return 0;
> 
> ...and this false.

I'll change this in v2.

Thanks,
Sergio.

> >  }
> >  
> >  static void nbd_eject_notifier(Notifier *n, void *data)
> 
> The patch looks correct to me, though I'm not sure if yield_co_list is
> an unnecessary complication (and if it isn't, whether that's safe).
> 
> I would be happy enough to apply it anyway if you can explain the
> yield_co_list thing, but I'll give Eric some time to have a look, too.
> 
> Kevin
>
Eric Blake June 1, 2021, 9:29 p.m. UTC | #3
On Tue, Jun 01, 2021 at 07:57:28AM +0200, Sergio Lopez wrote:
> Before switching between AioContexts we need to make sure that we're
> fully quiesced ("nb_requests == 0" for every client) when entering the
> drained section.
> 
> To do this, we set "quiescing = true" for every client on
> ".drained_begin" to prevent new coroutines to be created, and check if

s/to be created/from being created/

> "nb_requests == 0" on ".drained_poll". Finally, once we're exiting the
> drained section, on ".drained_end" we set "quiescing = false" and
> call "nbd_client_receive_next_request()" to resume the processing of
> new requests.
> 
> With these changes, "blk_aio_attach()" and "blk_aio_detach()" can be
> reverted to be as simple as they were before f148ae7d36.

Is that reversion planned to be patch 3 of your series in v2?

> 
> RHBZ: https://bugzilla.redhat.com/show_bug.cgi?id=1960137
> Suggested-by: Kevin Wolf <kwolf@redhat.com>
> Signed-off-by: Sergio Lopez <slp@redhat.com>
> ---
>  nbd/server.c | 99 +++++++++++++++++++++++++++++++++++++++-------------
>  1 file changed, 75 insertions(+), 24 deletions(-)
>
Eric Blake June 1, 2021, 9:31 p.m. UTC | #4
On Tue, Jun 01, 2021 at 06:31:29PM +0200, Sergio Lopez wrote:
> > Hm, how do you get more than one coroutine per client yielding in
> > nbd_read_eof() at the same time? I thought the model is that you always
> > have one coroutine reading the next request (which is
> > client->recv_coroutine) and all the others are just processing the
> > request they had read earlier. Multiple coroutines reading from the
> > same socket would sound like a bad idea.
> 
> You're right, there's only a single coroutine yielding on
> nbd_read_eof(). I've added the list while at a moment I was trying to
> keep track of every coroutine, and I kept it without thinking if it
> was really needed.
> 
> I'll drop it, entering just client->recv_coroutine is it isn't NULL.

Sounds like I'll wait for the v2 before applying.  But the overall
logic changes made sense to me.

> > The patch looks correct to me, though I'm not sure if yield_co_list is
> > an unnecessary complication (and if it isn't, whether that's safe).
> > 
> > I would be happy enough to apply it anyway if you can explain the
> > yield_co_list thing, but I'll give Eric some time to have a look, too.

Thanks for catching my attention on this!
Sergio Lopez June 2, 2021, 5:52 a.m. UTC | #5
On Tue, Jun 01, 2021 at 04:29:07PM -0500, Eric Blake wrote:
> On Tue, Jun 01, 2021 at 07:57:28AM +0200, Sergio Lopez wrote:
> > Before switching between AioContexts we need to make sure that we're
> > fully quiesced ("nb_requests == 0" for every client) when entering the
> > drained section.
> > 
> > To do this, we set "quiescing = true" for every client on
> > ".drained_begin" to prevent new coroutines to be created, and check if
> 
> s/to be created/from being created/
> 
> > "nb_requests == 0" on ".drained_poll". Finally, once we're exiting the
> > drained section, on ".drained_end" we set "quiescing = false" and
> > call "nbd_client_receive_next_request()" to resume the processing of
> > new requests.
> > 
> > With these changes, "blk_aio_attach()" and "blk_aio_detach()" can be
> > reverted to be as simple as they were before f148ae7d36.
> 
> Is that reversion planned to be patch 3 of your series in v2?

Actually, we need part of the changes introduced in f148ae7d36, so
it's probably simpler to manually revert "blk_aio_attach()" and
"blk_aio_detach()" here than doing an actual reversion and then
reintroducing the changes.

Thanks,
Sergio.
diff mbox series

Patch

diff --git a/nbd/server.c b/nbd/server.c
index 86a44a9b41..33e55479d7 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -132,7 +132,7 @@  struct NBDClient {
     CoMutex send_lock;
     Coroutine *send_coroutine;
 
-    bool read_yielding;
+    GSList *yield_co_list; /* List of coroutines yielding on nbd_read_eof */
     bool quiescing;
 
     QTAILQ_ENTRY(NBDClient) next;
@@ -1367,6 +1367,7 @@  static inline int coroutine_fn
 nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
 {
     bool partial = false;
+    Coroutine *co;
 
     assert(size);
     while (size > 0) {
@@ -1375,9 +1376,12 @@  nbd_read_eof(NBDClient *client, void *buffer, size_t size, Error **errp)
 
         len = qio_channel_readv(client->ioc, &iov, 1, errp);
         if (len == QIO_CHANNEL_ERR_BLOCK) {
-            client->read_yielding = true;
+            co = qemu_coroutine_self();
+
+            client->yield_co_list = g_slist_prepend(client->yield_co_list, co);
             qio_channel_yield(client->ioc, G_IO_IN);
-            client->read_yielding = false;
+            client->yield_co_list = g_slist_remove(client->yield_co_list, co);
+
             if (client->quiescing) {
                 return -EAGAIN;
             }
@@ -1513,6 +1517,11 @@  static void nbd_request_put(NBDRequestData *req)
     g_free(req);
 
     client->nb_requests--;
+
+    if (client->quiescing && client->nb_requests == 0) {
+        aio_wait_kick();
+    }
+
     nbd_client_receive_next_request(client);
 
     nbd_client_put(client);
@@ -1530,49 +1539,75 @@  static void blk_aio_attached(AioContext *ctx, void *opaque)
     QTAILQ_FOREACH(client, &exp->clients, next) {
         qio_channel_attach_aio_context(client->ioc, ctx);
 
+        assert(client->nb_requests == 0);
         assert(client->recv_coroutine == NULL);
         assert(client->send_coroutine == NULL);
-
-        if (client->quiescing) {
-            client->quiescing = false;
-            nbd_client_receive_next_request(client);
-        }
     }
 }
 
-static void nbd_aio_detach_bh(void *opaque)
+static void blk_aio_detach(void *opaque)
 {
     NBDExport *exp = opaque;
     NBDClient *client;
 
+    trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
+
     QTAILQ_FOREACH(client, &exp->clients, next) {
         qio_channel_detach_aio_context(client->ioc);
+    }
+
+    exp->common.ctx = NULL;
+}
+
+static void nbd_drained_begin(void *opaque)
+{
+    NBDExport *exp = opaque;
+    NBDClient *client;
+
+    QTAILQ_FOREACH(client, &exp->clients, next) {
         client->quiescing = true;
+    }
+}
 
-        if (client->recv_coroutine) {
-            if (client->read_yielding) {
-                qemu_aio_coroutine_enter(exp->common.ctx,
-                                         client->recv_coroutine);
-            } else {
-                AIO_WAIT_WHILE(exp->common.ctx, client->recv_coroutine != NULL);
-            }
-        }
+static void nbd_drained_end(void *opaque)
+{
+    NBDExport *exp = opaque;
+    NBDClient *client;
 
-        if (client->send_coroutine) {
-            AIO_WAIT_WHILE(exp->common.ctx, client->send_coroutine != NULL);
-        }
+    QTAILQ_FOREACH(client, &exp->clients, next) {
+        client->quiescing = false;
+        nbd_client_receive_next_request(client);
     }
 }
 
-static void blk_aio_detach(void *opaque)
+static bool nbd_drained_poll(void *opaque)
 {
     NBDExport *exp = opaque;
+    NBDClient *client;
+    Coroutine *co;
+    GSList *entry;
+    GSList *coroutine_list;
 
-    trace_nbd_blk_aio_detach(exp->name, exp->common.ctx);
+    QTAILQ_FOREACH(client, &exp->clients, next) {
+        if (client->nb_requests != 0) {
+            /*
+             * Enter coroutines waiting for new requests on nbd_read_eof(), so
+             * we don't depend on the client to wake us up.
+             */
+            coroutine_list = g_slist_copy(client->yield_co_list);
+            for (entry = coroutine_list;
+                 entry != NULL;
+                 entry = g_slist_next(entry)) {
+                co = entry->data;
+                qemu_aio_coroutine_enter(exp->common.ctx, co);
+            }
+            g_slist_free(coroutine_list);
 
-    aio_wait_bh_oneshot(exp->common.ctx, nbd_aio_detach_bh, exp);
+            return 1;
+        }
+    }
 
-    exp->common.ctx = NULL;
+    return 0;
 }
 
 static void nbd_eject_notifier(Notifier *n, void *data)
@@ -1594,6 +1629,12 @@  void nbd_export_set_on_eject_blk(BlockExport *exp, BlockBackend *blk)
     blk_add_remove_bs_notifier(blk, &nbd_exp->eject_notifier);
 }
 
+static const BlockDevOps nbd_block_ops = {
+    .drained_begin = nbd_drained_begin,
+    .drained_end = nbd_drained_end,
+    .drained_poll = nbd_drained_poll,
+};
+
 static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
                              Error **errp)
 {
@@ -1715,8 +1756,17 @@  static int nbd_export_create(BlockExport *blk_exp, BlockExportOptions *exp_args,
 
     exp->allocation_depth = arg->allocation_depth;
 
+    /*
+     * We need to inhibit request queuing in the block layer to ensure we can
+     * be properly quiesced when entering a drained section, as our coroutines
+     * servicing pending requests might enter blk_pread().
+     */
+    blk_set_disable_request_queuing(blk, true);
+
     blk_add_aio_context_notifier(blk, blk_aio_attached, blk_aio_detach, exp);
 
+    blk_set_dev_ops(blk, &nbd_block_ops, exp);
+
     QTAILQ_INSERT_TAIL(&exports, exp, next);
 
     return 0;
@@ -1788,6 +1838,7 @@  static void nbd_export_delete(BlockExport *blk_exp)
         }
         blk_remove_aio_context_notifier(exp->common.blk, blk_aio_attached,
                                         blk_aio_detach, exp);
+        blk_set_disable_request_queuing(exp->common.blk, false);
     }
 
     for (i = 0; i < exp->nr_export_bitmaps; i++) {