diff mbox series

[3/5] block/nbd.c: Add yank feature

Message ID 1e712fa7f08e4772c2a68197a851161bee51610f.1589193717.git.lukasstraub2@web.de (mailing list archive)
State New, archived
Headers show
Series Introduce 'yank' oob qmp command to recover from hanging qemu | expand

Commit Message

Lukas Straub May 11, 2020, 11:14 a.m. UTC
Add yank option, pass it to the socket-channel and register a yank
function which sets s->state = NBD_CLIENT_QUIT. This is the same
behaviour as if an error occured.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 Makefile.objs        |  1 +
 block/nbd.c          | 68 +++++++++++++++++++++++++++++++++-----------
 qapi/block-core.json |  5 +++-
 3 files changed, 56 insertions(+), 18 deletions(-)

Comments

Dr. David Alan Gilbert May 11, 2020, 4:19 p.m. UTC | #1
* Lukas Straub (lukasstraub2@web.de) wrote:
> Add yank option, pass it to the socket-channel and register a yank
> function which sets s->state = NBD_CLIENT_QUIT. This is the same
> behaviour as if an error occured.
> 
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>

> +static void nbd_yank(void *opaque)
> +{
> +    BlockDriverState *bs = opaque;
> +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> +
> +    atomic_set(&s->state, NBD_CLIENT_QUIT);

I think I was expecting a shutdown on the socket here - why doesn't it
have one?

Dave

> +}
> +
>  static void nbd_client_close(BlockDriverState *bs)
>  {
>      BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> @@ -1407,14 +1421,17 @@ static void nbd_client_close(BlockDriverState *bs)
>      nbd_teardown_connection(bs);
>  }
>  
> -static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
> +static QIOChannelSocket *nbd_establish_connection(BlockDriverState *bs,
> +                                                  SocketAddress *saddr,
>                                                    Error **errp)
>  {
> +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
>      QIOChannelSocket *sioc;
>      Error *local_err = NULL;
>  
>      sioc = qio_channel_socket_new();
>      qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
> +    qio_channel_set_yank(QIO_CHANNEL(sioc), s->yank);
>  
>      qio_channel_socket_connect_sync(sioc, saddr, &local_err);
>      if (local_err) {
> @@ -1438,7 +1455,7 @@ static int nbd_client_connect(BlockDriverState *bs, Error **errp)
>       * establish TCP connection, return error if it fails
>       * TODO: Configurable retry-until-timeout behaviour.
>       */
> -    QIOChannelSocket *sioc = nbd_establish_connection(s->saddr, errp);
> +    QIOChannelSocket *sioc = nbd_establish_connection(bs, s->saddr, errp);
>  
>      if (!sioc) {
>          return -ECONNREFUSED;
> @@ -1829,6 +1846,12 @@ static QemuOptsList nbd_runtime_opts = {
>                      "future requests before a successful reconnect will "
>                      "immediately fail. Default 0",
>          },
> +        {
> +            .name = "yank",
> +            .type = QEMU_OPT_BOOL,
> +            .help = "Forcibly close the connection and don't attempt to "
> +                    "reconnect when the 'yank' qmp command is executed.",
> +        },
>          { /* end of list */ }
>      },
>  };
> @@ -1888,6 +1911,8 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options,
>  
>      s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
>  
> +    s->yank = qemu_opt_get_bool(opts, "yank", false);
> +
>      ret = 0;
>  
>   error:
> @@ -1921,6 +1946,10 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
>      /* successfully connected */
>      s->state = NBD_CLIENT_CONNECTED;
>  
> +    if (s->yank) {
> +        yank_register_function(nbd_yank, bs);
> +    }
> +
>      s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
>      bdrv_inc_in_flight(bs);
>      aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
> @@ -1972,6 +2001,11 @@ static void nbd_close(BlockDriverState *bs)
>      BDRVNBDState *s = bs->opaque;
>  
>      nbd_client_close(bs);
> +
> +    if (s->yank) {
> +        yank_unregister_function(nbd_yank, bs);
> +    }
> +
>      nbd_clear_bdrvstate(s);
>  }
>  
> diff --git a/qapi/block-core.json b/qapi/block-core.json
> index 943df1926a..1c1578160e 100644
> --- a/qapi/block-core.json
> +++ b/qapi/block-core.json
> @@ -3862,6 +3862,8 @@
>  #                   reconnect. After that time, any delayed requests and all
>  #                   future requests before a successful reconnect will
>  #                   immediately fail. Default 0 (Since 4.2)
> +# @yank: Forcibly close the connection and don't attempt to reconnect when
> +#        the 'yank' qmp command is executed. (Since: 5.1)
>  #
>  # Since: 2.9
>  ##
> @@ -3870,7 +3872,8 @@
>              '*export': 'str',
>              '*tls-creds': 'str',
>              '*x-dirty-bitmap': 'str',
> -            '*reconnect-delay': 'uint32' } }
> +            '*reconnect-delay': 'uint32',
> +	    'yank': 'bool' } }
>  
>  ##
>  # @BlockdevOptionsRaw:
> -- 
> 2.20.1
> 


--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Lukas Straub May 11, 2020, 5:05 p.m. UTC | #2
On Mon, 11 May 2020 17:19:09 +0100
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:

> * Lukas Straub (lukasstraub2@web.de) wrote:
> > Add yank option, pass it to the socket-channel and register a yank
> > function which sets s->state = NBD_CLIENT_QUIT. This is the same
> > behaviour as if an error occured.
> > 
> > Signed-off-by: Lukas Straub <lukasstraub2@web.de>  
> 
> > +static void nbd_yank(void *opaque)
> > +{
> > +    BlockDriverState *bs = opaque;
> > +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > +
> > +    atomic_set(&s->state, NBD_CLIENT_QUIT);  
> 
> I think I was expecting a shutdown on the socket here - why doesn't it
> have one?

For nbd, we register two yank functions: This one and we enable the yank feature on the qio channel (see function nbd_establish_connection below).

Regards,
Lukas Straub

> Dave
> 
> > +}
> > +
> >  static void nbd_client_close(BlockDriverState *bs)
> >  {
> >      BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > @@ -1407,14 +1421,17 @@ static void nbd_client_close(BlockDriverState *bs)
> >      nbd_teardown_connection(bs);
> >  }
> >  
> > -static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
> > +static QIOChannelSocket *nbd_establish_connection(BlockDriverState *bs,
> > +                                                  SocketAddress *saddr,
> >                                                    Error **errp)
> >  {
> > +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> >      QIOChannelSocket *sioc;
> >      Error *local_err = NULL;
> >  
> >      sioc = qio_channel_socket_new();
> >      qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
> > +    qio_channel_set_yank(QIO_CHANNEL(sioc), s->yank);
> >  
> >      qio_channel_socket_connect_sync(sioc, saddr, &local_err);
> >      if (local_err) {
> > @@ -1438,7 +1455,7 @@ static int nbd_client_connect(BlockDriverState *bs, Error **errp)
> >       * establish TCP connection, return error if it fails
> >       * TODO: Configurable retry-until-timeout behaviour.
> >       */
> > -    QIOChannelSocket *sioc = nbd_establish_connection(s->saddr, errp);
> > +    QIOChannelSocket *sioc = nbd_establish_connection(bs, s->saddr, errp);
> >  
> >      if (!sioc) {
> >          return -ECONNREFUSED;
> > @@ -1829,6 +1846,12 @@ static QemuOptsList nbd_runtime_opts = {
> >                      "future requests before a successful reconnect will "
> >                      "immediately fail. Default 0",
> >          },
> > +        {
> > +            .name = "yank",
> > +            .type = QEMU_OPT_BOOL,
> > +            .help = "Forcibly close the connection and don't attempt to "
> > +                    "reconnect when the 'yank' qmp command is executed.",
> > +        },
> >          { /* end of list */ }
> >      },
> >  };
> > @@ -1888,6 +1911,8 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options,
> >  
> >      s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
> >  
> > +    s->yank = qemu_opt_get_bool(opts, "yank", false);
> > +
> >      ret = 0;
> >  
> >   error:
> > @@ -1921,6 +1946,10 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
> >      /* successfully connected */
> >      s->state = NBD_CLIENT_CONNECTED;
> >  
> > +    if (s->yank) {
> > +        yank_register_function(nbd_yank, bs);
> > +    }
> > +
> >      s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
> >      bdrv_inc_in_flight(bs);
> >      aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
> > @@ -1972,6 +2001,11 @@ static void nbd_close(BlockDriverState *bs)
> >      BDRVNBDState *s = bs->opaque;
> >  
> >      nbd_client_close(bs);
> > +
> > +    if (s->yank) {
> > +        yank_unregister_function(nbd_yank, bs);
> > +    }
> > +
> >      nbd_clear_bdrvstate(s);
> >  }
> >  
> > diff --git a/qapi/block-core.json b/qapi/block-core.json
> > index 943df1926a..1c1578160e 100644
> > --- a/qapi/block-core.json
> > +++ b/qapi/block-core.json
> > @@ -3862,6 +3862,8 @@
> >  #                   reconnect. After that time, any delayed requests and all
> >  #                   future requests before a successful reconnect will
> >  #                   immediately fail. Default 0 (Since 4.2)
> > +# @yank: Forcibly close the connection and don't attempt to reconnect when
> > +#        the 'yank' qmp command is executed. (Since: 5.1)
> >  #
> >  # Since: 2.9
> >  ##
> > @@ -3870,7 +3872,8 @@
> >              '*export': 'str',
> >              '*tls-creds': 'str',
> >              '*x-dirty-bitmap': 'str',
> > -            '*reconnect-delay': 'uint32' } }
> > +            '*reconnect-delay': 'uint32',
> > +	    'yank': 'bool' } }
> >  
> >  ##
> >  # @BlockdevOptionsRaw:
> > -- 
> > 2.20.1
> >   
> 
> 
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
>
Dr. David Alan Gilbert May 11, 2020, 5:24 p.m. UTC | #3
* Lukas Straub (lukasstraub2@web.de) wrote:
> On Mon, 11 May 2020 17:19:09 +0100
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> 
> > * Lukas Straub (lukasstraub2@web.de) wrote:
> > > Add yank option, pass it to the socket-channel and register a yank
> > > function which sets s->state = NBD_CLIENT_QUIT. This is the same
> > > behaviour as if an error occured.
> > > 
> > > Signed-off-by: Lukas Straub <lukasstraub2@web.de>  
> > 
> > > +static void nbd_yank(void *opaque)
> > > +{
> > > +    BlockDriverState *bs = opaque;
> > > +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > > +
> > > +    atomic_set(&s->state, NBD_CLIENT_QUIT);  
> > 
> > I think I was expecting a shutdown on the socket here - why doesn't it
> > have one?
> 
> For nbd, we register two yank functions: This one and we enable the yank feature on the qio channel (see function nbd_establish_connection below).

Oh I see; yeh that still surprises me a little; I'd expected one yank
per item.

Dave

> Regards,
> Lukas Straub
> 
> > Dave
> > 
> > > +}
> > > +
> > >  static void nbd_client_close(BlockDriverState *bs)
> > >  {
> > >      BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > > @@ -1407,14 +1421,17 @@ static void nbd_client_close(BlockDriverState *bs)
> > >      nbd_teardown_connection(bs);
> > >  }
> > >  
> > > -static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
> > > +static QIOChannelSocket *nbd_establish_connection(BlockDriverState *bs,
> > > +                                                  SocketAddress *saddr,
> > >                                                    Error **errp)
> > >  {
> > > +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > >      QIOChannelSocket *sioc;
> > >      Error *local_err = NULL;
> > >  
> > >      sioc = qio_channel_socket_new();
> > >      qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
> > > +    qio_channel_set_yank(QIO_CHANNEL(sioc), s->yank);
> > >  
> > >      qio_channel_socket_connect_sync(sioc, saddr, &local_err);
> > >      if (local_err) {
> > > @@ -1438,7 +1455,7 @@ static int nbd_client_connect(BlockDriverState *bs, Error **errp)
> > >       * establish TCP connection, return error if it fails
> > >       * TODO: Configurable retry-until-timeout behaviour.
> > >       */
> > > -    QIOChannelSocket *sioc = nbd_establish_connection(s->saddr, errp);
> > > +    QIOChannelSocket *sioc = nbd_establish_connection(bs, s->saddr, errp);
> > >  
> > >      if (!sioc) {
> > >          return -ECONNREFUSED;
> > > @@ -1829,6 +1846,12 @@ static QemuOptsList nbd_runtime_opts = {
> > >                      "future requests before a successful reconnect will "
> > >                      "immediately fail. Default 0",
> > >          },
> > > +        {
> > > +            .name = "yank",
> > > +            .type = QEMU_OPT_BOOL,
> > > +            .help = "Forcibly close the connection and don't attempt to "
> > > +                    "reconnect when the 'yank' qmp command is executed.",
> > > +        },
> > >          { /* end of list */ }
> > >      },
> > >  };
> > > @@ -1888,6 +1911,8 @@ static int nbd_process_options(BlockDriverState *bs, QDict *options,
> > >  
> > >      s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
> > >  
> > > +    s->yank = qemu_opt_get_bool(opts, "yank", false);
> > > +
> > >      ret = 0;
> > >  
> > >   error:
> > > @@ -1921,6 +1946,10 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
> > >      /* successfully connected */
> > >      s->state = NBD_CLIENT_CONNECTED;
> > >  
> > > +    if (s->yank) {
> > > +        yank_register_function(nbd_yank, bs);
> > > +    }
> > > +
> > >      s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
> > >      bdrv_inc_in_flight(bs);
> > >      aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
> > > @@ -1972,6 +2001,11 @@ static void nbd_close(BlockDriverState *bs)
> > >      BDRVNBDState *s = bs->opaque;
> > >  
> > >      nbd_client_close(bs);
> > > +
> > > +    if (s->yank) {
> > > +        yank_unregister_function(nbd_yank, bs);
> > > +    }
> > > +
> > >      nbd_clear_bdrvstate(s);
> > >  }
> > >  
> > > diff --git a/qapi/block-core.json b/qapi/block-core.json
> > > index 943df1926a..1c1578160e 100644
> > > --- a/qapi/block-core.json
> > > +++ b/qapi/block-core.json
> > > @@ -3862,6 +3862,8 @@
> > >  #                   reconnect. After that time, any delayed requests and all
> > >  #                   future requests before a successful reconnect will
> > >  #                   immediately fail. Default 0 (Since 4.2)
> > > +# @yank: Forcibly close the connection and don't attempt to reconnect when
> > > +#        the 'yank' qmp command is executed. (Since: 5.1)
> > >  #
> > >  # Since: 2.9
> > >  ##
> > > @@ -3870,7 +3872,8 @@
> > >              '*export': 'str',
> > >              '*tls-creds': 'str',
> > >              '*x-dirty-bitmap': 'str',
> > > -            '*reconnect-delay': 'uint32' } }
> > > +            '*reconnect-delay': 'uint32',
> > > +	    'yank': 'bool' } }
> > >  
> > >  ##
> > >  # @BlockdevOptionsRaw:
> > > -- 
> > > 2.20.1
> > >   
> > 
> > 
> > --
> > Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
> > 
> 


--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Daniel P. Berrangé May 12, 2020, 8:54 a.m. UTC | #4
On Mon, May 11, 2020 at 07:05:24PM +0200, Lukas Straub wrote:
> On Mon, 11 May 2020 17:19:09 +0100
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> 
> > * Lukas Straub (lukasstraub2@web.de) wrote:
> > > Add yank option, pass it to the socket-channel and register a yank
> > > function which sets s->state = NBD_CLIENT_QUIT. This is the same
> > > behaviour as if an error occured.
> > > 
> > > Signed-off-by: Lukas Straub <lukasstraub2@web.de>  
> > 
> > > +static void nbd_yank(void *opaque)
> > > +{
> > > +    BlockDriverState *bs = opaque;
> > > +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > > +
> > > +    atomic_set(&s->state, NBD_CLIENT_QUIT);  
> > 
> > I think I was expecting a shutdown on the socket here - why doesn't it
> > have one?
> 
> For nbd, we register two yank functions: This one and we enable
> the yank feature on the qio channel (see function
> nbd_establish_connection below).

As mentioned on the earlier patch, I don't want to see any yank
code in the QIOChannel object directly. This nbd_yank function
can simply call the qio_channel_shutdown() function directly
and avoid need for modifying the QIOChannel object with yank
support.

This will make the NBD code clearer too, as we can see exactly
what actions are performed at NBD layer when a yank happens,
which is what David was not seeing clearly here.

Regards,
Daniel
Lukas Straub May 15, 2020, 9:48 a.m. UTC | #5
On Tue, 12 May 2020 09:54:58 +0100
Daniel P. Berrangé <berrange@redhat.com> wrote:

> On Mon, May 11, 2020 at 07:05:24PM +0200, Lukas Straub wrote:
> > On Mon, 11 May 2020 17:19:09 +0100
> > "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> >   
> > > * Lukas Straub (lukasstraub2@web.de) wrote:  
> > > > Add yank option, pass it to the socket-channel and register a yank
> > > > function which sets s->state = NBD_CLIENT_QUIT. This is the same
> > > > behaviour as if an error occured.
> > > > 
> > > > Signed-off-by: Lukas Straub <lukasstraub2@web.de>    
> > >   
> > > > +static void nbd_yank(void *opaque)
> > > > +{
> > > > +    BlockDriverState *bs = opaque;
> > > > +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > > > +
> > > > +    atomic_set(&s->state, NBD_CLIENT_QUIT);    
> > > 
> > > I think I was expecting a shutdown on the socket here - why doesn't it
> > > have one?  
> > 
> > For nbd, we register two yank functions: This one and we enable
> > the yank feature on the qio channel (see function
> > nbd_establish_connection below).  
> 
> As mentioned on the earlier patch, I don't want to see any yank
> code in the QIOChannel object directly. This nbd_yank function
> can simply call the qio_channel_shutdown() function directly
> and avoid need for modifying the QIOChannel object with yank
> support.

Hi,
Looking at it again, the problem is not with registering the yank functions, but with tracking the lifetime of it. Suppose we add qio_channel_shutdown to the yank_nbd function. Then we need to unregister it whenever the QIOChannel object is freed.

In the code that would lead to the following constructs in a lot of places:
     if (local_err) {
         yank_unregister_function(s->yank_name, yank_nbd, bs);
         object_unref(OBJECT(sioc));
         error_propagate(errp, local_err);
         return NULL;
     }

If you don't want the code in QIOChannel I guess I can create a subclass (YankableChannelSocket?) of QIOChannelSocket. What do you think?

Regards,
Lukas Straub

> This will make the NBD code clearer too, as we can see exactly
> what actions are performed at NBD layer when a yank happens,
> which is what David was not seeing clearly here.
> 
> Regards,
> Daniel
Daniel P. Berrangé May 15, 2020, 10:04 a.m. UTC | #6
On Fri, May 15, 2020 at 11:48:18AM +0200, Lukas Straub wrote:
> On Tue, 12 May 2020 09:54:58 +0100
> Daniel P. Berrangé <berrange@redhat.com> wrote:
> 
> > On Mon, May 11, 2020 at 07:05:24PM +0200, Lukas Straub wrote:
> > > On Mon, 11 May 2020 17:19:09 +0100
> > > "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > >   
> > > > * Lukas Straub (lukasstraub2@web.de) wrote:  
> > > > > Add yank option, pass it to the socket-channel and register a yank
> > > > > function which sets s->state = NBD_CLIENT_QUIT. This is the same
> > > > > behaviour as if an error occured.
> > > > > 
> > > > > Signed-off-by: Lukas Straub <lukasstraub2@web.de>    
> > > >   
> > > > > +static void nbd_yank(void *opaque)
> > > > > +{
> > > > > +    BlockDriverState *bs = opaque;
> > > > > +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > > > > +
> > > > > +    atomic_set(&s->state, NBD_CLIENT_QUIT);    
> > > > 
> > > > I think I was expecting a shutdown on the socket here - why doesn't it
> > > > have one?  
> > > 
> > > For nbd, we register two yank functions: This one and we enable
> > > the yank feature on the qio channel (see function
> > > nbd_establish_connection below).  
> > 
> > As mentioned on the earlier patch, I don't want to see any yank
> > code in the QIOChannel object directly. This nbd_yank function
> > can simply call the qio_channel_shutdown() function directly
> > and avoid need for modifying the QIOChannel object with yank
> > support.
> 
> Hi,
> Looking at it again, the problem is not with registering the yank functions, but with tracking the lifetime of it. Suppose we add qio_channel_shutdown to the yank_nbd function. Then we need to unregister it whenever the QIOChannel object is freed.
> 
> In the code that would lead to the following constructs in a lot of places:
>      if (local_err) {
>          yank_unregister_function(s->yank_name, yank_nbd, bs);
>          object_unref(OBJECT(sioc));
>          error_propagate(errp, local_err);
>          return NULL;
>      }

The nbd patch here already has a yank_unregister_function() so I'm
not seeing anything changes in that respect. The "yank_nbd" function
should check that the I/O channel is non-NULL before calling the
qio_channel_shutdown method.

> If you don't want the code in QIOChannel I guess I can create a
> subclass (YankableChannelSocket?) of QIOChannelSocket. What do
> you think?

That's no better, and I don't see any compelling need for it as calling
yank_unregister_function is something nbd already does in its nbd_close
function. It isn't a burden for the other backends to do similarly.



Regards,
Daniel
Lukas Straub May 15, 2020, 10:14 a.m. UTC | #7
On Fri, 15 May 2020 11:04:13 +0100
Daniel P. Berrangé <berrange@redhat.com> wrote:

> On Fri, May 15, 2020 at 11:48:18AM +0200, Lukas Straub wrote:
> > On Tue, 12 May 2020 09:54:58 +0100
> > Daniel P. Berrangé <berrange@redhat.com> wrote:
> >   
> > > On Mon, May 11, 2020 at 07:05:24PM +0200, Lukas Straub wrote:  
> > > > On Mon, 11 May 2020 17:19:09 +0100
> > > > "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > > >     
> > > > > * Lukas Straub (lukasstraub2@web.de) wrote:    
> > > > > > Add yank option, pass it to the socket-channel and register a yank
> > > > > > function which sets s->state = NBD_CLIENT_QUIT. This is the same
> > > > > > behaviour as if an error occured.
> > > > > > 
> > > > > > Signed-off-by: Lukas Straub <lukasstraub2@web.de>      
> > > > >     
> > > > > > +static void nbd_yank(void *opaque)
> > > > > > +{
> > > > > > +    BlockDriverState *bs = opaque;
> > > > > > +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > > > > > +
> > > > > > +    atomic_set(&s->state, NBD_CLIENT_QUIT);      
> > > > > 
> > > > > I think I was expecting a shutdown on the socket here - why doesn't it
> > > > > have one?    
> > > > 
> > > > For nbd, we register two yank functions: This one and we enable
> > > > the yank feature on the qio channel (see function
> > > > nbd_establish_connection below).    
> > > 
> > > As mentioned on the earlier patch, I don't want to see any yank
> > > code in the QIOChannel object directly. This nbd_yank function
> > > can simply call the qio_channel_shutdown() function directly
> > > and avoid need for modifying the QIOChannel object with yank
> > > support.  
> > 
> > Hi,
> > Looking at it again, the problem is not with registering the yank functions, but with tracking the lifetime of it. Suppose we add qio_channel_shutdown to the yank_nbd function. Then we need to unregister it whenever the QIOChannel object is freed.
> > 
> > In the code that would lead to the following constructs in a lot of places:
> >      if (local_err) {
> >          yank_unregister_function(s->yank_name, yank_nbd, bs);
> >          object_unref(OBJECT(sioc));
> >          error_propagate(errp, local_err);
> >          return NULL;
> >      }  
> 
> The nbd patch here already has a yank_unregister_function() so I'm
> not seeing anything changes in that respect. The "yank_nbd" function
> should check that the I/O channel is non-NULL before calling the
> qio_channel_shutdown method.

Hmm, but if object_unref frees the object, it doesn't set the pointer to NULL does it?

> > If you don't want the code in QIOChannel I guess I can create a
> > subclass (YankableChannelSocket?) of QIOChannelSocket. What do
> > you think?  
> 
> That's no better, and I don't see any compelling need for it as calling
> yank_unregister_function is something nbd already does in its nbd_close
> function. It isn't a burden for the other backends to do similarly.
> 
> 
> 
> Regards,
> Daniel
Daniel P. Berrangé May 15, 2020, 10:26 a.m. UTC | #8
On Fri, May 15, 2020 at 12:14:47PM +0200, Lukas Straub wrote:
> On Fri, 15 May 2020 11:04:13 +0100
> Daniel P. Berrangé <berrange@redhat.com> wrote:
> 
> > On Fri, May 15, 2020 at 11:48:18AM +0200, Lukas Straub wrote:
> > > On Tue, 12 May 2020 09:54:58 +0100
> > > Daniel P. Berrangé <berrange@redhat.com> wrote:
> > >   
> > > > On Mon, May 11, 2020 at 07:05:24PM +0200, Lukas Straub wrote:  
> > > > > On Mon, 11 May 2020 17:19:09 +0100
> > > > > "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > > > >     
> > > > > > * Lukas Straub (lukasstraub2@web.de) wrote:    
> > > > > > > Add yank option, pass it to the socket-channel and register a yank
> > > > > > > function which sets s->state = NBD_CLIENT_QUIT. This is the same
> > > > > > > behaviour as if an error occured.
> > > > > > > 
> > > > > > > Signed-off-by: Lukas Straub <lukasstraub2@web.de>      
> > > > > >     
> > > > > > > +static void nbd_yank(void *opaque)
> > > > > > > +{
> > > > > > > +    BlockDriverState *bs = opaque;
> > > > > > > +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > > > > > > +
> > > > > > > +    atomic_set(&s->state, NBD_CLIENT_QUIT);      
> > > > > > 
> > > > > > I think I was expecting a shutdown on the socket here - why doesn't it
> > > > > > have one?    
> > > > > 
> > > > > For nbd, we register two yank functions: This one and we enable
> > > > > the yank feature on the qio channel (see function
> > > > > nbd_establish_connection below).    
> > > > 
> > > > As mentioned on the earlier patch, I don't want to see any yank
> > > > code in the QIOChannel object directly. This nbd_yank function
> > > > can simply call the qio_channel_shutdown() function directly
> > > > and avoid need for modifying the QIOChannel object with yank
> > > > support.  
> > > 
> > > Hi,
> > > Looking at it again, the problem is not with registering the yank functions, but with tracking the lifetime of it. Suppose we add qio_channel_shutdown to the yank_nbd function. Then we need to unregister it whenever the QIOChannel object is freed.
> > > 
> > > In the code that would lead to the following constructs in a lot of places:
> > >      if (local_err) {
> > >          yank_unregister_function(s->yank_name, yank_nbd, bs);
> > >          object_unref(OBJECT(sioc));
> > >          error_propagate(errp, local_err);
> > >          return NULL;
> > >      }  
> > 
> > The nbd patch here already has a yank_unregister_function() so I'm
> > not seeing anything changes in that respect. The "yank_nbd" function
> > should check that the I/O channel is non-NULL before calling the
> > qio_channel_shutdown method.
> 
> Hmm, but if object_unref frees the object, it doesn't set the
> pointer to NULL does it?

So set  "ioc = NULL" after calling object_unref. AFAICT, nbd already
does exactly this.


Regards,
Daniel
Lukas Straub May 15, 2020, 1:03 p.m. UTC | #9
On Fri, 15 May 2020 11:26:13 +0100
Daniel P. Berrangé <berrange@redhat.com> wrote:

> On Fri, May 15, 2020 at 12:14:47PM +0200, Lukas Straub wrote:
> > On Fri, 15 May 2020 11:04:13 +0100
> > Daniel P. Berrangé <berrange@redhat.com> wrote:
> >   
> > > On Fri, May 15, 2020 at 11:48:18AM +0200, Lukas Straub wrote:  
> > > > On Tue, 12 May 2020 09:54:58 +0100
> > > > Daniel P. Berrangé <berrange@redhat.com> wrote:
> > > >     
> > > > > On Mon, May 11, 2020 at 07:05:24PM +0200, Lukas Straub wrote:    
> > > > > > On Mon, 11 May 2020 17:19:09 +0100
> > > > > > "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > > > > >       
> > > > > > > * Lukas Straub (lukasstraub2@web.de) wrote:      
> > > > > > > > Add yank option, pass it to the socket-channel and register a yank
> > > > > > > > function which sets s->state = NBD_CLIENT_QUIT. This is the same
> > > > > > > > behaviour as if an error occured.
> > > > > > > > 
> > > > > > > > Signed-off-by: Lukas Straub <lukasstraub2@web.de>        
> > > > > > >       
> > > > > > > > +static void nbd_yank(void *opaque)
> > > > > > > > +{
> > > > > > > > +    BlockDriverState *bs = opaque;
> > > > > > > > +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > > > > > > > +
> > > > > > > > +    atomic_set(&s->state, NBD_CLIENT_QUIT);        
> > > > > > > 
> > > > > > > I think I was expecting a shutdown on the socket here - why doesn't it
> > > > > > > have one?      
> > > > > > 
> > > > > > For nbd, we register two yank functions: This one and we enable
> > > > > > the yank feature on the qio channel (see function
> > > > > > nbd_establish_connection below).      
> > > > > 
> > > > > As mentioned on the earlier patch, I don't want to see any yank
> > > > > code in the QIOChannel object directly. This nbd_yank function
> > > > > can simply call the qio_channel_shutdown() function directly
> > > > > and avoid need for modifying the QIOChannel object with yank
> > > > > support.    
> > > > 
> > > > Hi,
> > > > Looking at it again, the problem is not with registering the yank functions, but with tracking the lifetime of it. Suppose we add qio_channel_shutdown to the yank_nbd function. Then we need to unregister it whenever the QIOChannel object is freed.
> > > > 
> > > > In the code that would lead to the following constructs in a lot of places:
> > > >      if (local_err) {
> > > >          yank_unregister_function(s->yank_name, yank_nbd, bs);
> > > >          object_unref(OBJECT(sioc));
> > > >          error_propagate(errp, local_err);
> > > >          return NULL;
> > > >      }    
> > > 
> > > The nbd patch here already has a yank_unregister_function() so I'm
> > > not seeing anything changes in that respect. The "yank_nbd" function
> > > should check that the I/O channel is non-NULL before calling the
> > > qio_channel_shutdown method.  
> > 
> > Hmm, but if object_unref frees the object, it doesn't set the
> > pointer to NULL does it?  
> 
> So set  "ioc = NULL" after calling object_unref. AFAICT, nbd already
> does exactly this.

I see 3 options to do that in a thread-safe manner:
1. Introduce a mutex here.
2. Use atomics to check for NULL and increase the reference count at the same time in yank_nbd so it isn't freed while calling qio_channel_shutdown on it. (I'm unsure how to do that)
3. Do it like it is currently done (but with the new subclass). We get thread safety for free trough the mutex in yank.c.

I prefer option 3 :)

The subclass can live in yank.c. There'd have to be 2 yank functions again but a comment in yank_nbd should suffice.

Regards,
Lukas Straub
Daniel P. Berrangé May 15, 2020, 1:45 p.m. UTC | #10
On Fri, May 15, 2020 at 03:03:30PM +0200, Lukas Straub wrote:
> On Fri, 15 May 2020 11:26:13 +0100
> Daniel P. Berrangé <berrange@redhat.com> wrote:
> 
> > On Fri, May 15, 2020 at 12:14:47PM +0200, Lukas Straub wrote:
> > > On Fri, 15 May 2020 11:04:13 +0100
> > > Daniel P. Berrangé <berrange@redhat.com> wrote:
> > >   
> > > > On Fri, May 15, 2020 at 11:48:18AM +0200, Lukas Straub wrote:  
> > > > > On Tue, 12 May 2020 09:54:58 +0100
> > > > > Daniel P. Berrangé <berrange@redhat.com> wrote:
> > > > >     
> > > > > > On Mon, May 11, 2020 at 07:05:24PM +0200, Lukas Straub wrote:    
> > > > > > > On Mon, 11 May 2020 17:19:09 +0100
> > > > > > > "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > > > > > >       
> > > > > > > > * Lukas Straub (lukasstraub2@web.de) wrote:      
> > > > > > > > > Add yank option, pass it to the socket-channel and register a yank
> > > > > > > > > function which sets s->state = NBD_CLIENT_QUIT. This is the same
> > > > > > > > > behaviour as if an error occured.
> > > > > > > > > 
> > > > > > > > > Signed-off-by: Lukas Straub <lukasstraub2@web.de>        
> > > > > > > >       
> > > > > > > > > +static void nbd_yank(void *opaque)
> > > > > > > > > +{
> > > > > > > > > +    BlockDriverState *bs = opaque;
> > > > > > > > > +    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> > > > > > > > > +
> > > > > > > > > +    atomic_set(&s->state, NBD_CLIENT_QUIT);        
> > > > > > > > 
> > > > > > > > I think I was expecting a shutdown on the socket here - why doesn't it
> > > > > > > > have one?      
> > > > > > > 
> > > > > > > For nbd, we register two yank functions: This one and we enable
> > > > > > > the yank feature on the qio channel (see function
> > > > > > > nbd_establish_connection below).      
> > > > > > 
> > > > > > As mentioned on the earlier patch, I don't want to see any yank
> > > > > > code in the QIOChannel object directly. This nbd_yank function
> > > > > > can simply call the qio_channel_shutdown() function directly
> > > > > > and avoid need for modifying the QIOChannel object with yank
> > > > > > support.    
> > > > > 
> > > > > Hi,
> > > > > Looking at it again, the problem is not with registering the yank functions, but with tracking the lifetime of it. Suppose we add qio_channel_shutdown to the yank_nbd function. Then we need to unregister it whenever the QIOChannel object is freed.
> > > > > 
> > > > > In the code that would lead to the following constructs in a lot of places:
> > > > >      if (local_err) {
> > > > >          yank_unregister_function(s->yank_name, yank_nbd, bs);
> > > > >          object_unref(OBJECT(sioc));
> > > > >          error_propagate(errp, local_err);
> > > > >          return NULL;
> > > > >      }    
> > > > 
> > > > The nbd patch here already has a yank_unregister_function() so I'm
> > > > not seeing anything changes in that respect. The "yank_nbd" function
> > > > should check that the I/O channel is non-NULL before calling the
> > > > qio_channel_shutdown method.  
> > > 
> > > Hmm, but if object_unref frees the object, it doesn't set the
> > > pointer to NULL does it?  
> > 
> > So set  "ioc = NULL" after calling object_unref. AFAICT, nbd already
> > does exactly this.
> 
> I see 3 options to do that in a thread-safe manner:
> 1. Introduce a mutex here.
> 2. Use atomics to check for NULL and increase the reference count at the same time in yank_nbd so it isn't freed while calling qio_channel_shutdown on it. (I'm unsure how to do that)
> 3. Do it like it is currently done (but with the new subclass). We get thread safety for free trough the mutex in yank.c.

Oh, so the problem is that  the yank function can be invoked concurrently
with the object being unreffed.

In normal object finalizers, we just have to order things such that
yank_unregister_function() is called before object_unref(ioc) is
called.

The NBD code is slightly harder because we can close & re-open the
IO separately from the finalizer. eg in nbd_reconnect_attempt we
have

    if (s->ioc) {
        nbd_client_detach_aio_context(s->bs);
        object_unref(OBJECT(s->sioc));
        s->sioc = NULL;
        object_unref(OBJECT(s->ioc));
        s->ioc = NULL;
    }

    s->connect_status = nbd_client_connect(s->bs, &local_err);

If the io channel is not open, then we don't need a yank function
registered. So this code would changed to


    if (s->ioc) {
        nbd_client_detach_aio_context(s->bs);
	yank_unregister_function(...);
        object_unref(OBJECT(s->sioc));
        s->sioc = NULL;
        object_unref(OBJECT(s->ioc));
        s->ioc = NULL;
    }

    s->connect_status = nbd_client_connect(s->bs, &local_err);

The locking in yank_unregister_function() should ensure that any
currently executing yank callback has completed before the
yank_unregister_function() call returns. Thus it should be safe
to unref the cannel.

nbd_client_connect() will call yank_register_function() once it
has successfully started a new connection, which your patch already
handles IIUC.

Regards,
Daniel
diff mbox series

Patch

diff --git a/Makefile.objs b/Makefile.objs
index 889115775c..4b213b3e78 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -18,6 +18,7 @@  block-obj-y += block.o blockjob.o job.o
 block-obj-y += block/ scsi/
 block-obj-y += qemu-io-cmds.o
 block-obj-$(CONFIG_REPLICATION) += replication.o
+block-obj-y += yank.o
 
 block-obj-m = block/
 
diff --git a/block/nbd.c b/block/nbd.c
index 2160859f64..3c0fd3abb8 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -35,6 +35,7 @@ 
 #include "qemu/option.h"
 #include "qemu/cutils.h"
 #include "qemu/main-loop.h"
+#include "qemu/atomic.h"
 
 #include "qapi/qapi-visit-sockets.h"
 #include "qapi/qmp/qstring.h"
@@ -43,6 +44,8 @@ 
 #include "block/nbd.h"
 #include "block/block_int.h"
 
+#include "yank.h"
+
 #define EN_OPTSTR ":exportname="
 #define MAX_NBD_REQUESTS    16
 
@@ -91,6 +94,7 @@  typedef struct BDRVNBDState {
     QCryptoTLSCreds *tlscreds;
     const char *hostname;
     char *x_dirty_bitmap;
+    bool yank;
 } BDRVNBDState;
 
 static int nbd_client_connect(BlockDriverState *bs, Error **errp);
@@ -111,12 +115,12 @@  static void nbd_clear_bdrvstate(BDRVNBDState *s)
 static void nbd_channel_error(BDRVNBDState *s, int ret)
 {
     if (ret == -EIO) {
-        if (s->state == NBD_CLIENT_CONNECTED) {
+        if (atomic_read(&s->state) == NBD_CLIENT_CONNECTED) {
             s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
                                             NBD_CLIENT_CONNECTING_NOWAIT;
         }
     } else {
-        if (s->state == NBD_CLIENT_CONNECTED) {
+        if (atomic_read(&s->state) == NBD_CLIENT_CONNECTED) {
             qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
         }
         s->state = NBD_CLIENT_QUIT;
@@ -167,7 +171,7 @@  static void nbd_client_attach_aio_context(BlockDriverState *bs,
      * s->connection_co is either yielded from nbd_receive_reply or from
      * nbd_co_reconnect_loop()
      */
-    if (s->state == NBD_CLIENT_CONNECTED) {
+    if (atomic_read(&s->state) == NBD_CLIENT_CONNECTED) {
         qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
     }
 
@@ -206,7 +210,7 @@  static void nbd_teardown_connection(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
-    if (s->state == NBD_CLIENT_CONNECTED) {
+    if (atomic_read(&s->state) == NBD_CLIENT_CONNECTED) {
         /* finish any pending coroutines */
         assert(s->ioc);
         qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
@@ -230,13 +234,14 @@  static void nbd_teardown_connection(BlockDriverState *bs)
 
 static bool nbd_client_connecting(BDRVNBDState *s)
 {
-    return s->state == NBD_CLIENT_CONNECTING_WAIT ||
-        s->state == NBD_CLIENT_CONNECTING_NOWAIT;
+    NBDClientState state = atomic_read(&s->state);
+    return state == NBD_CLIENT_CONNECTING_WAIT ||
+        state == NBD_CLIENT_CONNECTING_NOWAIT;
 }
 
 static bool nbd_client_connecting_wait(BDRVNBDState *s)
 {
-    return s->state == NBD_CLIENT_CONNECTING_WAIT;
+    return atomic_read(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
 }
 
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
@@ -305,7 +310,7 @@  static coroutine_fn void nbd_co_reconnect_loop(BDRVNBDState *s)
     nbd_reconnect_attempt(s);
 
     while (nbd_client_connecting(s)) {
-        if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
+        if (atomic_read(&s->state) == NBD_CLIENT_CONNECTING_WAIT &&
             qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
         {
             s->state = NBD_CLIENT_CONNECTING_NOWAIT;
@@ -341,7 +346,7 @@  static coroutine_fn void nbd_connection_entry(void *opaque)
     int ret = 0;
     Error *local_err = NULL;
 
-    while (s->state != NBD_CLIENT_QUIT) {
+    while (atomic_read(&s->state) != NBD_CLIENT_QUIT) {
         /*
          * The NBD client can only really be considered idle when it has
          * yielded from qio_channel_readv_all_eof(), waiting for data. This is
@@ -356,7 +361,7 @@  static coroutine_fn void nbd_connection_entry(void *opaque)
             nbd_co_reconnect_loop(s);
         }
 
-        if (s->state != NBD_CLIENT_CONNECTED) {
+        if (atomic_read(&s->state) != NBD_CLIENT_CONNECTED) {
             continue;
         }
 
@@ -435,7 +440,7 @@  static int nbd_co_send_request(BlockDriverState *bs,
         qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
     }
 
-    if (s->state != NBD_CLIENT_CONNECTED) {
+    if (atomic_read(&s->state) != NBD_CLIENT_CONNECTED) {
         rc = -EIO;
         goto err;
     }
@@ -462,7 +467,7 @@  static int nbd_co_send_request(BlockDriverState *bs,
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc = nbd_send_request(s->ioc, request);
-        if (rc >= 0 && s->state == NBD_CLIENT_CONNECTED) {
+        if (rc >= 0 && atomic_read(&s->state) == NBD_CLIENT_CONNECTED) {
             if (qio_channel_writev_all(s->ioc, qiov->iov, qiov->niov,
                                        NULL) < 0) {
                 rc = -EIO;
@@ -777,7 +782,7 @@  static coroutine_fn int nbd_co_do_receive_one_chunk(
     s->requests[i].receiving = true;
     qemu_coroutine_yield();
     s->requests[i].receiving = false;
-    if (s->state != NBD_CLIENT_CONNECTED) {
+    if (atomic_read(&s->state) != NBD_CLIENT_CONNECTED) {
         error_setg(errp, "Connection closed");
         return -EIO;
     }
@@ -936,7 +941,7 @@  static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     NBDReply local_reply;
     NBDStructuredReplyChunk *chunk;
     Error *local_err = NULL;
-    if (s->state != NBD_CLIENT_CONNECTED) {
+    if (atomic_read(&s->state) != NBD_CLIENT_CONNECTED) {
         error_setg(&local_err, "Connection closed");
         nbd_iter_channel_error(iter, -EIO, &local_err);
         goto break_loop;
@@ -961,7 +966,8 @@  static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     }
 
     /* Do not execute the body of NBD_FOREACH_REPLY_CHUNK for simple reply. */
-    if (nbd_reply_is_simple(reply) || s->state != NBD_CLIENT_CONNECTED) {
+    if (nbd_reply_is_simple(reply) ||
+        atomic_read(&s->state) != NBD_CLIENT_CONNECTED) {
         goto break_loop;
     }
 
@@ -1395,6 +1401,14 @@  static int nbd_client_reopen_prepare(BDRVReopenState *state,
     return 0;
 }
 
+static void nbd_yank(void *opaque)
+{
+    BlockDriverState *bs = opaque;
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+
+    atomic_set(&s->state, NBD_CLIENT_QUIT);
+}
+
 static void nbd_client_close(BlockDriverState *bs)
 {
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
@@ -1407,14 +1421,17 @@  static void nbd_client_close(BlockDriverState *bs)
     nbd_teardown_connection(bs);
 }
 
-static QIOChannelSocket *nbd_establish_connection(SocketAddress *saddr,
+static QIOChannelSocket *nbd_establish_connection(BlockDriverState *bs,
+                                                  SocketAddress *saddr,
                                                   Error **errp)
 {
+    BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     QIOChannelSocket *sioc;
     Error *local_err = NULL;
 
     sioc = qio_channel_socket_new();
     qio_channel_set_name(QIO_CHANNEL(sioc), "nbd-client");
+    qio_channel_set_yank(QIO_CHANNEL(sioc), s->yank);
 
     qio_channel_socket_connect_sync(sioc, saddr, &local_err);
     if (local_err) {
@@ -1438,7 +1455,7 @@  static int nbd_client_connect(BlockDriverState *bs, Error **errp)
      * establish TCP connection, return error if it fails
      * TODO: Configurable retry-until-timeout behaviour.
      */
-    QIOChannelSocket *sioc = nbd_establish_connection(s->saddr, errp);
+    QIOChannelSocket *sioc = nbd_establish_connection(bs, s->saddr, errp);
 
     if (!sioc) {
         return -ECONNREFUSED;
@@ -1829,6 +1846,12 @@  static QemuOptsList nbd_runtime_opts = {
                     "future requests before a successful reconnect will "
                     "immediately fail. Default 0",
         },
+        {
+            .name = "yank",
+            .type = QEMU_OPT_BOOL,
+            .help = "Forcibly close the connection and don't attempt to "
+                    "reconnect when the 'yank' qmp command is executed.",
+        },
         { /* end of list */ }
     },
 };
@@ -1888,6 +1911,8 @@  static int nbd_process_options(BlockDriverState *bs, QDict *options,
 
     s->reconnect_delay = qemu_opt_get_number(opts, "reconnect-delay", 0);
 
+    s->yank = qemu_opt_get_bool(opts, "yank", false);
+
     ret = 0;
 
  error:
@@ -1921,6 +1946,10 @@  static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     /* successfully connected */
     s->state = NBD_CLIENT_CONNECTED;
 
+    if (s->yank) {
+        yank_register_function(nbd_yank, bs);
+    }
+
     s->connection_co = qemu_coroutine_create(nbd_connection_entry, s);
     bdrv_inc_in_flight(bs);
     aio_co_schedule(bdrv_get_aio_context(bs), s->connection_co);
@@ -1972,6 +2001,11 @@  static void nbd_close(BlockDriverState *bs)
     BDRVNBDState *s = bs->opaque;
 
     nbd_client_close(bs);
+
+    if (s->yank) {
+        yank_unregister_function(nbd_yank, bs);
+    }
+
     nbd_clear_bdrvstate(s);
 }
 
diff --git a/qapi/block-core.json b/qapi/block-core.json
index 943df1926a..1c1578160e 100644
--- a/qapi/block-core.json
+++ b/qapi/block-core.json
@@ -3862,6 +3862,8 @@ 
 #                   reconnect. After that time, any delayed requests and all
 #                   future requests before a successful reconnect will
 #                   immediately fail. Default 0 (Since 4.2)
+# @yank: Forcibly close the connection and don't attempt to reconnect when
+#        the 'yank' qmp command is executed. (Since: 5.1)
 #
 # Since: 2.9
 ##
@@ -3870,7 +3872,8 @@ 
             '*export': 'str',
             '*tls-creds': 'str',
             '*x-dirty-bitmap': 'str',
-            '*reconnect-delay': 'uint32' } }
+            '*reconnect-delay': 'uint32',
+	    'yank': 'bool' } }
 
 ##
 # @BlockdevOptionsRaw: