diff mbox series

[v3,14/33] nbd: move connection code from block/nbd to nbd/client-connection

Message ID 20210416080911.83197-15-vsementsov@virtuozzo.com (mailing list archive)
State New
Headers show
Series block/nbd: rework client connection | expand

Commit Message

Vladimir Sementsov-Ogievskiy April 16, 2021, 8:08 a.m. UTC
We now have bs-independent connection API, which consists of four
functions:

  nbd_client_connection_new()
  nbd_client_connection_unref()
  nbd_co_establish_connection()
  nbd_co_establish_connection_cancel()

Move them to a separate file together with NBDClientConnection
structure which becomes private to the new API.

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
---
 include/block/nbd.h     |  11 +++
 block/nbd.c             | 187 -----------------------------------
 nbd/client-connection.c | 212 ++++++++++++++++++++++++++++++++++++++++
 nbd/meson.build         |   1 +
 4 files changed, 224 insertions(+), 187 deletions(-)
 create mode 100644 nbd/client-connection.c

Comments

Roman Kagan April 27, 2021, 10:45 p.m. UTC | #1
On Fri, Apr 16, 2021 at 11:08:52AM +0300, Vladimir Sementsov-Ogievskiy wrote:
> We now have bs-independent connection API, which consists of four
> functions:
> 
>   nbd_client_connection_new()
>   nbd_client_connection_unref()
>   nbd_co_establish_connection()
>   nbd_co_establish_connection_cancel()
> 
> Move them to a separate file together with NBDClientConnection
> structure which becomes private to the new API.
> 
> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
> ---
>  include/block/nbd.h     |  11 +++
>  block/nbd.c             | 187 -----------------------------------
>  nbd/client-connection.c | 212 ++++++++++++++++++++++++++++++++++++++++
>  nbd/meson.build         |   1 +
>  4 files changed, 224 insertions(+), 187 deletions(-)
>  create mode 100644 nbd/client-connection.c
> 
> diff --git a/include/block/nbd.h b/include/block/nbd.h
> index 5f34d23bb0..57381be76f 100644
> --- a/include/block/nbd.h
> +++ b/include/block/nbd.h
> @@ -406,4 +406,15 @@ const char *nbd_info_lookup(uint16_t info);
>  const char *nbd_cmd_lookup(uint16_t info);
>  const char *nbd_err_lookup(int err);
>  
> +/* nbd/client-connection.c */
> +typedef struct NBDClientConnection NBDClientConnection;
> +
> +NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr);
> +void nbd_client_connection_release(NBDClientConnection *conn);
> +
> +QIOChannelSocket *coroutine_fn
> +nbd_co_establish_connection(NBDClientConnection *conn, Error **errp);
> +
> +void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn);
> +
>  #endif
> diff --git a/block/nbd.c b/block/nbd.c
> index 8531d019b2..9bd68dcf10 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -66,24 +66,6 @@ typedef enum NBDClientState {
>      NBD_CLIENT_QUIT
>  } NBDClientState;
>  
> -typedef struct NBDClientConnection {
> -    /* Initialization constants */
> -    SocketAddress *saddr; /* address to connect to */
> -
> -    /*
> -     * Result of last attempt. Valid in FAIL and SUCCESS states.
> -     * If you want to steal error, don't forget to set pointer to NULL.
> -     */
> -    QIOChannelSocket *sioc;
> -    Error *err;
> -
> -    QemuMutex mutex;
> -    /* All further fields are protected by mutex */
> -    bool running; /* thread is running now */
> -    bool detached; /* thread is detached and should cleanup the state */
> -    Coroutine *wait_co; /* nbd_co_establish_connection() wait in yield() */
> -} NBDClientConnection;
> -
>  typedef struct BDRVNBDState {
>      QIOChannelSocket *sioc; /* The master data channel */
>      QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
> @@ -118,12 +100,8 @@ typedef struct BDRVNBDState {
>      NBDClientConnection *conn;
>  } BDRVNBDState;
>  
> -static void nbd_client_connection_release(NBDClientConnection *conn);
>  static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr,
>                                      Error **errp);
> -static coroutine_fn QIOChannelSocket *
> -nbd_co_establish_connection(NBDClientConnection *conn, Error **errp);
> -static void nbd_co_establish_connection_cancel(NBDClientConnection *conn);
>  static int nbd_client_handshake(BlockDriverState *bs, Error **errp);
>  static void nbd_yank(void *opaque);
>  
> @@ -340,171 +318,6 @@ static bool nbd_client_connecting_wait(BDRVNBDState *s)
>      return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
>  }
>  
> -static NBDClientConnection *
> -nbd_client_connection_new(const SocketAddress *saddr)
> -{
> -    NBDClientConnection *conn = g_new(NBDClientConnection, 1);
> -
> -    *conn = (NBDClientConnection) {
> -        .saddr = QAPI_CLONE(SocketAddress, saddr),
> -    };
> -
> -    qemu_mutex_init(&conn->mutex);
> -
> -    return conn;
> -}
> -
> -static void nbd_client_connection_do_free(NBDClientConnection *conn)
> -{
> -    if (conn->sioc) {
> -        qio_channel_close(QIO_CHANNEL(conn->sioc), NULL);
> -        object_unref(OBJECT(conn->sioc));
> -    }
> -    error_free(conn->err);
> -    qapi_free_SocketAddress(conn->saddr);
> -    g_free(conn);
> -}
> -
> -static void *connect_thread_func(void *opaque)
> -{
> -    NBDClientConnection *conn = opaque;
> -    bool do_free;
> -    int ret;
> -
> -    conn->sioc = qio_channel_socket_new();
> -
> -    error_free(conn->err);
> -    conn->err = NULL;
> -    ret = qio_channel_socket_connect_sync(conn->sioc, conn->saddr, &conn->err);
> -    if (ret < 0) {
> -        object_unref(OBJECT(conn->sioc));
> -        conn->sioc = NULL;
> -    }
> -
> -    qemu_mutex_lock(&conn->mutex);
> -
> -    assert(conn->running);
> -    conn->running = false;
> -    if (conn->wait_co) {
> -        aio_co_schedule(NULL, conn->wait_co);
> -        conn->wait_co = NULL;
> -    }
> -    do_free = conn->detached;
> -
> -    qemu_mutex_unlock(&conn->mutex);
> -
> -    if (do_free) {
> -        nbd_client_connection_do_free(conn);
> -    }
> -
> -    return NULL;
> -}
> -
> -static void nbd_client_connection_release(NBDClientConnection *conn)
> -{
> -    bool do_free;
> -
> -    if (!conn) {
> -        return;
> -    }
> -
> -    qemu_mutex_lock(&conn->mutex);
> -    if (conn->running) {
> -        conn->detached = true;
> -    }
> -    do_free = !conn->running && !conn->detached;
> -    qemu_mutex_unlock(&conn->mutex);
> -
> -    if (do_free) {
> -        nbd_client_connection_do_free(conn);
> -    }
> -}
> -
> -/*
> - * Get a new connection in context of @conn:
> - *   if thread is running, wait for completion
> - *   if thread is already succeeded in background, and user didn't get the
> - *     result, just return it now
> - *   otherwise if thread is not running, start a thread and wait for completion
> - */
> -static coroutine_fn QIOChannelSocket *
> -nbd_co_establish_connection(NBDClientConnection *conn, Error **errp)
> -{
> -    QIOChannelSocket *sioc = NULL;
> -    QemuThread thread;
> -
> -    qemu_mutex_lock(&conn->mutex);
> -
> -    /*
> -     * Don't call nbd_co_establish_connection() in several coroutines in
> -     * parallel. Only one call at once is supported.
> -     */
> -    assert(!conn->wait_co);
> -
> -    if (!conn->running) {
> -        if (conn->sioc) {
> -            /* Previous attempt finally succeeded in background */
> -            sioc = g_steal_pointer(&conn->sioc);
> -            qemu_mutex_unlock(&conn->mutex);
> -
> -            return sioc;
> -        }
> -
> -        conn->running = true;
> -        error_free(conn->err);
> -        conn->err = NULL;
> -        qemu_thread_create(&thread, "nbd-connect",
> -                           connect_thread_func, conn, QEMU_THREAD_DETACHED);
> -    }
> -
> -    conn->wait_co = qemu_coroutine_self();
> -
> -    qemu_mutex_unlock(&conn->mutex);
> -
> -    /*
> -     * We are going to wait for connect-thread finish, but
> -     * nbd_co_establish_connection_cancel() can interrupt.
> -     */
> -    qemu_coroutine_yield();
> -
> -    qemu_mutex_lock(&conn->mutex);
> -
> -    if (conn->running) {
> -        /*
> -         * Obviously, drained section wants to start. Report the attempt as
> -         * failed. Still connect thread is executing in background, and its
> -         * result may be used for next connection attempt.
> -         */
> -        error_setg(errp, "Connection attempt cancelled by other operation");
> -    } else {
> -        error_propagate(errp, conn->err);
> -        conn->err = NULL;
> -        sioc = g_steal_pointer(&conn->sioc);
> -    }
> -
> -    qemu_mutex_unlock(&conn->mutex);
> -
> -    return sioc;
> -}
> -
> -/*
> - * nbd_co_establish_connection_cancel
> - * Cancel nbd_co_establish_connection() asynchronously. Note, that it doesn't
> - * stop the thread itself neither close the socket. It just safely wakes
> - * nbd_co_establish_connection() sleeping in the yield().
> - */
> -static void nbd_co_establish_connection_cancel(NBDClientConnection *conn)
> -{
> -    qemu_mutex_lock(&conn->mutex);
> -
> -    if (conn->wait_co) {
> -        aio_co_schedule(NULL, conn->wait_co);
> -        conn->wait_co = NULL;
> -    }
> -
> -    qemu_mutex_unlock(&conn->mutex);
> -}
> -
>  static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>  {
>      int ret;
> diff --git a/nbd/client-connection.c b/nbd/client-connection.c
> new file mode 100644
> index 0000000000..4e39a5b1af
> --- /dev/null
> +++ b/nbd/client-connection.c
> @@ -0,0 +1,212 @@
> +/*
> + * QEMU Block driver for  NBD
> + *
> + * Copyright (c) 2021 Virtuozzo International GmbH.
> + *
> + * Permission is hereby granted, free of charge, to any person obtaining a copy
> + * of this software and associated documentation files (the "Software"), to deal
> + * in the Software without restriction, including without limitation the rights
> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
> + * copies of the Software, and to permit persons to whom the Software is
> + * furnished to do so, subject to the following conditions:
> + *
> + * The above copyright notice and this permission notice shall be included in
> + * all copies or substantial portions of the Software.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
> + * THE SOFTWARE.
> + */
> +
> +#include "qemu/osdep.h"
> +
> +#include "block/nbd.h"
> +
> +#include "qapi/qapi-visit-sockets.h"
> +#include "qapi/clone-visitor.h"
> +
> +struct NBDClientConnection {
> +    /* Initialization constants */
> +    SocketAddress *saddr; /* address to connect to */
> +
> +    /*
> +     * Result of last attempt. Valid in FAIL and SUCCESS states.
> +     * If you want to steal error, don't forget to set pointer to NULL.
> +     */
> +    QIOChannelSocket *sioc;
> +    Error *err;

These two are also manipulated under the mutex.  Consider also updating
the comment: both these pointers are to be "stolen" by the caller, with
the former being valid when the connection succeeds and the latter
otherwise.

Roman.

> +
> +    QemuMutex mutex;
> +    /* All further fields are protected by mutex */
> +    bool running; /* thread is running now */
> +    bool detached; /* thread is detached and should cleanup the state */
> +    Coroutine *wait_co; /* nbd_co_establish_connection() wait in yield() */
> +};
> +
> +NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr)
> +{
> +    NBDClientConnection *conn = g_new(NBDClientConnection, 1);
> +
> +    *conn = (NBDClientConnection) {
> +        .saddr = QAPI_CLONE(SocketAddress, saddr),
> +    };
> +
> +    qemu_mutex_init(&conn->mutex);
> +
> +    return conn;
> +}
> +
> +static void nbd_client_connection_do_free(NBDClientConnection *conn)
> +{
> +    if (conn->sioc) {
> +        qio_channel_close(QIO_CHANNEL(conn->sioc), NULL);
> +        object_unref(OBJECT(conn->sioc));
> +    }
> +    error_free(conn->err);
> +    qapi_free_SocketAddress(conn->saddr);
> +    g_free(conn);
> +}
> +
> +static void *connect_thread_func(void *opaque)
> +{
> +    NBDClientConnection *conn = opaque;
> +    bool do_free;
> +    int ret;
> +
> +    conn->sioc = qio_channel_socket_new();
> +
> +    error_free(conn->err);
> +    conn->err = NULL;
> +    ret = qio_channel_socket_connect_sync(conn->sioc, conn->saddr, &conn->err);
> +    if (ret < 0) {
> +        object_unref(OBJECT(conn->sioc));
> +        conn->sioc = NULL;
> +    }
> +
> +    qemu_mutex_lock(&conn->mutex);
> +
> +    assert(conn->running);
> +    conn->running = false;
> +    if (conn->wait_co) {
> +        aio_co_schedule(NULL, conn->wait_co);
> +        conn->wait_co = NULL;
> +    }
> +    do_free = conn->detached;
> +
> +    qemu_mutex_unlock(&conn->mutex);
> +
> +    if (do_free) {
> +        nbd_client_connection_do_free(conn);
> +    }
> +
> +    return NULL;
> +}
> +
> +void nbd_client_connection_release(NBDClientConnection *conn)
> +{
> +    bool do_free;
> +
> +    if (!conn) {
> +        return;
> +    }
> +
> +    qemu_mutex_lock(&conn->mutex);
> +    if (conn->running) {
> +        conn->detached = true;
> +    }
> +    do_free = !conn->running && !conn->detached;
> +    qemu_mutex_unlock(&conn->mutex);
> +
> +    if (do_free) {
> +        nbd_client_connection_do_free(conn);
> +    }
> +}
> +
> +/*
> + * Get a new connection in context of @conn:
> + *   if thread is running, wait for completion
> + *   if thread is already succeeded in background, and user didn't get the
> + *     result, just return it now
> + *   otherwise if thread is not running, start a thread and wait for completion
> + */
> +QIOChannelSocket *coroutine_fn
> +nbd_co_establish_connection(NBDClientConnection *conn, Error **errp)
> +{
> +    QIOChannelSocket *sioc = NULL;
> +    QemuThread thread;
> +
> +    qemu_mutex_lock(&conn->mutex);
> +
> +    /*
> +     * Don't call nbd_co_establish_connection() in several coroutines in
> +     * parallel. Only one call at once is supported.
> +     */
> +    assert(!conn->wait_co);
> +
> +    if (!conn->running) {
> +        if (conn->sioc) {
> +            /* Previous attempt finally succeeded in background */
> +            sioc = g_steal_pointer(&conn->sioc);
> +            qemu_mutex_unlock(&conn->mutex);
> +
> +            return sioc;
> +        }
> +
> +        conn->running = true;
> +        error_free(conn->err);
> +        conn->err = NULL;
> +        qemu_thread_create(&thread, "nbd-connect",
> +                           connect_thread_func, conn, QEMU_THREAD_DETACHED);
> +    }
> +
> +    conn->wait_co = qemu_coroutine_self();
> +
> +    qemu_mutex_unlock(&conn->mutex);
> +
> +    /*
> +     * We are going to wait for connect-thread finish, but
> +     * nbd_co_establish_connection_cancel() can interrupt.
> +     */
> +    qemu_coroutine_yield();
> +
> +    qemu_mutex_lock(&conn->mutex);
> +
> +    if (conn->running) {
> +        /*
> +         * Obviously, drained section wants to start. Report the attempt as
> +         * failed. Still connect thread is executing in background, and its
> +         * result may be used for next connection attempt.
> +         */
> +        error_setg(errp, "Connection attempt cancelled by other operation");
> +    } else {
> +        error_propagate(errp, conn->err);
> +        conn->err = NULL;
> +        sioc = g_steal_pointer(&conn->sioc);
> +    }
> +
> +    qemu_mutex_unlock(&conn->mutex);
> +
> +    return sioc;
> +}
> +
> +/*
> + * nbd_co_establish_connection_cancel
> + * Cancel nbd_co_establish_connection() asynchronously. Note, that it doesn't
> + * stop the thread itself neither close the socket. It just safely wakes
> + * nbd_co_establish_connection() sleeping in the yield().
> + */
> +void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn)
> +{
> +    qemu_mutex_lock(&conn->mutex);
> +
> +    if (conn->wait_co) {
> +        aio_co_schedule(NULL, conn->wait_co);
> +        conn->wait_co = NULL;
> +    }
> +
> +    qemu_mutex_unlock(&conn->mutex);
> +}
> diff --git a/nbd/meson.build b/nbd/meson.build
> index 2baaa36948..b26d70565e 100644
> --- a/nbd/meson.build
> +++ b/nbd/meson.build
> @@ -1,5 +1,6 @@
>  block_ss.add(files(
>    'client.c',
> +  'client-connection.c',
>    'common.c',
>  ))
>  blockdev_ss.add(files(
> -- 
> 2.29.2
>
Vladimir Sementsov-Ogievskiy April 28, 2021, 8:14 a.m. UTC | #2
28.04.2021 01:45, Roman Kagan wrote:
> On Fri, Apr 16, 2021 at 11:08:52AM +0300, Vladimir Sementsov-Ogievskiy wrote:
>> We now have bs-independent connection API, which consists of four
>> functions:
>>
>>    nbd_client_connection_new()
>>    nbd_client_connection_unref()
>>    nbd_co_establish_connection()
>>    nbd_co_establish_connection_cancel()
>>
>> Move them to a separate file together with NBDClientConnection
>> structure which becomes private to the new API.
>>
>> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
>> ---
>>   include/block/nbd.h     |  11 +++
>>   block/nbd.c             | 187 -----------------------------------
>>   nbd/client-connection.c | 212 ++++++++++++++++++++++++++++++++++++++++
>>   nbd/meson.build         |   1 +
>>   4 files changed, 224 insertions(+), 187 deletions(-)
>>   create mode 100644 nbd/client-connection.c
>>
>> diff --git a/include/block/nbd.h b/include/block/nbd.h
>> index 5f34d23bb0..57381be76f 100644
>> --- a/include/block/nbd.h
>> +++ b/include/block/nbd.h
>> @@ -406,4 +406,15 @@ const char *nbd_info_lookup(uint16_t info);
>>   const char *nbd_cmd_lookup(uint16_t info);
>>   const char *nbd_err_lookup(int err);
>>   
>> +/* nbd/client-connection.c */
>> +typedef struct NBDClientConnection NBDClientConnection;
>> +
>> +NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr);
>> +void nbd_client_connection_release(NBDClientConnection *conn);
>> +
>> +QIOChannelSocket *coroutine_fn
>> +nbd_co_establish_connection(NBDClientConnection *conn, Error **errp);
>> +
>> +void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn);
>> +
>>   #endif
>> diff --git a/block/nbd.c b/block/nbd.c
>> index 8531d019b2..9bd68dcf10 100644
>> --- a/block/nbd.c
>> +++ b/block/nbd.c
>> @@ -66,24 +66,6 @@ typedef enum NBDClientState {
>>       NBD_CLIENT_QUIT
>>   } NBDClientState;
>>   
>> -typedef struct NBDClientConnection {
>> -    /* Initialization constants */
>> -    SocketAddress *saddr; /* address to connect to */
>> -
>> -    /*
>> -     * Result of last attempt. Valid in FAIL and SUCCESS states.
>> -     * If you want to steal error, don't forget to set pointer to NULL.
>> -     */
>> -    QIOChannelSocket *sioc;
>> -    Error *err;
>> -
>> -    QemuMutex mutex;
>> -    /* All further fields are protected by mutex */
>> -    bool running; /* thread is running now */
>> -    bool detached; /* thread is detached and should cleanup the state */
>> -    Coroutine *wait_co; /* nbd_co_establish_connection() wait in yield() */
>> -} NBDClientConnection;
>> -
>>   typedef struct BDRVNBDState {
>>       QIOChannelSocket *sioc; /* The master data channel */
>>       QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
>> @@ -118,12 +100,8 @@ typedef struct BDRVNBDState {
>>       NBDClientConnection *conn;
>>   } BDRVNBDState;
>>   
>> -static void nbd_client_connection_release(NBDClientConnection *conn);
>>   static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr,
>>                                       Error **errp);
>> -static coroutine_fn QIOChannelSocket *
>> -nbd_co_establish_connection(NBDClientConnection *conn, Error **errp);
>> -static void nbd_co_establish_connection_cancel(NBDClientConnection *conn);
>>   static int nbd_client_handshake(BlockDriverState *bs, Error **errp);
>>   static void nbd_yank(void *opaque);
>>   
>> @@ -340,171 +318,6 @@ static bool nbd_client_connecting_wait(BDRVNBDState *s)
>>       return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
>>   }
>>   
>> -static NBDClientConnection *
>> -nbd_client_connection_new(const SocketAddress *saddr)
>> -{
>> -    NBDClientConnection *conn = g_new(NBDClientConnection, 1);
>> -
>> -    *conn = (NBDClientConnection) {
>> -        .saddr = QAPI_CLONE(SocketAddress, saddr),
>> -    };
>> -
>> -    qemu_mutex_init(&conn->mutex);
>> -
>> -    return conn;
>> -}
>> -
>> -static void nbd_client_connection_do_free(NBDClientConnection *conn)
>> -{
>> -    if (conn->sioc) {
>> -        qio_channel_close(QIO_CHANNEL(conn->sioc), NULL);
>> -        object_unref(OBJECT(conn->sioc));
>> -    }
>> -    error_free(conn->err);
>> -    qapi_free_SocketAddress(conn->saddr);
>> -    g_free(conn);
>> -}
>> -
>> -static void *connect_thread_func(void *opaque)
>> -{
>> -    NBDClientConnection *conn = opaque;
>> -    bool do_free;
>> -    int ret;
>> -
>> -    conn->sioc = qio_channel_socket_new();
>> -
>> -    error_free(conn->err);
>> -    conn->err = NULL;
>> -    ret = qio_channel_socket_connect_sync(conn->sioc, conn->saddr, &conn->err);
>> -    if (ret < 0) {
>> -        object_unref(OBJECT(conn->sioc));
>> -        conn->sioc = NULL;
>> -    }
>> -
>> -    qemu_mutex_lock(&conn->mutex);
>> -
>> -    assert(conn->running);
>> -    conn->running = false;
>> -    if (conn->wait_co) {
>> -        aio_co_schedule(NULL, conn->wait_co);
>> -        conn->wait_co = NULL;
>> -    }
>> -    do_free = conn->detached;
>> -
>> -    qemu_mutex_unlock(&conn->mutex);
>> -
>> -    if (do_free) {
>> -        nbd_client_connection_do_free(conn);
>> -    }
>> -
>> -    return NULL;
>> -}
>> -
>> -static void nbd_client_connection_release(NBDClientConnection *conn)
>> -{
>> -    bool do_free;
>> -
>> -    if (!conn) {
>> -        return;
>> -    }
>> -
>> -    qemu_mutex_lock(&conn->mutex);
>> -    if (conn->running) {
>> -        conn->detached = true;
>> -    }
>> -    do_free = !conn->running && !conn->detached;
>> -    qemu_mutex_unlock(&conn->mutex);
>> -
>> -    if (do_free) {
>> -        nbd_client_connection_do_free(conn);
>> -    }
>> -}
>> -
>> -/*
>> - * Get a new connection in context of @conn:
>> - *   if thread is running, wait for completion
>> - *   if thread is already succeeded in background, and user didn't get the
>> - *     result, just return it now
>> - *   otherwise if thread is not running, start a thread and wait for completion
>> - */
>> -static coroutine_fn QIOChannelSocket *
>> -nbd_co_establish_connection(NBDClientConnection *conn, Error **errp)
>> -{
>> -    QIOChannelSocket *sioc = NULL;
>> -    QemuThread thread;
>> -
>> -    qemu_mutex_lock(&conn->mutex);
>> -
>> -    /*
>> -     * Don't call nbd_co_establish_connection() in several coroutines in
>> -     * parallel. Only one call at once is supported.
>> -     */
>> -    assert(!conn->wait_co);
>> -
>> -    if (!conn->running) {
>> -        if (conn->sioc) {
>> -            /* Previous attempt finally succeeded in background */
>> -            sioc = g_steal_pointer(&conn->sioc);
>> -            qemu_mutex_unlock(&conn->mutex);
>> -
>> -            return sioc;
>> -        }
>> -
>> -        conn->running = true;
>> -        error_free(conn->err);
>> -        conn->err = NULL;
>> -        qemu_thread_create(&thread, "nbd-connect",
>> -                           connect_thread_func, conn, QEMU_THREAD_DETACHED);
>> -    }
>> -
>> -    conn->wait_co = qemu_coroutine_self();
>> -
>> -    qemu_mutex_unlock(&conn->mutex);
>> -
>> -    /*
>> -     * We are going to wait for connect-thread finish, but
>> -     * nbd_co_establish_connection_cancel() can interrupt.
>> -     */
>> -    qemu_coroutine_yield();
>> -
>> -    qemu_mutex_lock(&conn->mutex);
>> -
>> -    if (conn->running) {
>> -        /*
>> -         * Obviously, drained section wants to start. Report the attempt as
>> -         * failed. Still connect thread is executing in background, and its
>> -         * result may be used for next connection attempt.
>> -         */
>> -        error_setg(errp, "Connection attempt cancelled by other operation");
>> -    } else {
>> -        error_propagate(errp, conn->err);
>> -        conn->err = NULL;
>> -        sioc = g_steal_pointer(&conn->sioc);
>> -    }
>> -
>> -    qemu_mutex_unlock(&conn->mutex);
>> -
>> -    return sioc;
>> -}
>> -
>> -/*
>> - * nbd_co_establish_connection_cancel
>> - * Cancel nbd_co_establish_connection() asynchronously. Note, that it doesn't
>> - * stop the thread itself neither close the socket. It just safely wakes
>> - * nbd_co_establish_connection() sleeping in the yield().
>> - */
>> -static void nbd_co_establish_connection_cancel(NBDClientConnection *conn)
>> -{
>> -    qemu_mutex_lock(&conn->mutex);
>> -
>> -    if (conn->wait_co) {
>> -        aio_co_schedule(NULL, conn->wait_co);
>> -        conn->wait_co = NULL;
>> -    }
>> -
>> -    qemu_mutex_unlock(&conn->mutex);
>> -}
>> -
>>   static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>>   {
>>       int ret;
>> diff --git a/nbd/client-connection.c b/nbd/client-connection.c
>> new file mode 100644
>> index 0000000000..4e39a5b1af
>> --- /dev/null
>> +++ b/nbd/client-connection.c
>> @@ -0,0 +1,212 @@
>> +/*
>> + * QEMU Block driver for  NBD
>> + *
>> + * Copyright (c) 2021 Virtuozzo International GmbH.
>> + *
>> + * Permission is hereby granted, free of charge, to any person obtaining a copy
>> + * of this software and associated documentation files (the "Software"), to deal
>> + * in the Software without restriction, including without limitation the rights
>> + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
>> + * copies of the Software, and to permit persons to whom the Software is
>> + * furnished to do so, subject to the following conditions:
>> + *
>> + * The above copyright notice and this permission notice shall be included in
>> + * all copies or substantial portions of the Software.
>> + *
>> + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
>> + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
>> + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
>> + * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
>> + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
>> + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
>> + * THE SOFTWARE.
>> + */
>> +
>> +#include "qemu/osdep.h"
>> +
>> +#include "block/nbd.h"
>> +
>> +#include "qapi/qapi-visit-sockets.h"
>> +#include "qapi/clone-visitor.h"
>> +
>> +struct NBDClientConnection {
>> +    /* Initialization constants */
>> +    SocketAddress *saddr; /* address to connect to */
>> +
>> +    /*
>> +     * Result of last attempt. Valid in FAIL and SUCCESS states.
>> +     * If you want to steal error, don't forget to set pointer to NULL.
>> +     */
>> +    QIOChannelSocket *sioc;
>> +    Error *err;
> 
> These two are also manipulated under the mutex.  Consider also updating
> the comment: both these pointers are to be "stolen" by the caller, with
> the former being valid when the connection succeeds and the latter
> otherwise.
> 

Hmm. I should move mutex and "All further" comment above these two fields.

Ok, I'll think on updating the comment (probably as an additional patch, to keep this as a simple movement). I don't like to document that they are stolen by caller(). For me it sounds like caller is user of the interface. And caller of nbd_co_establish_connection() doesn't stole anything: the structure is private now..

> 
>> +
>> +    QemuMutex mutex;
>> +    /* All further fields are protected by mutex */
>> +    bool running; /* thread is running now */
>> +    bool detached; /* thread is detached and should cleanup the state */
>> +    Coroutine *wait_co; /* nbd_co_establish_connection() wait in yield() */
>> +};
>> +
>> +NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr)
>> +{
>> +    NBDClientConnection *conn = g_new(NBDClientConnection, 1);
>> +
>> +    *conn = (NBDClientConnection) {
>> +        .saddr = QAPI_CLONE(SocketAddress, saddr),
>> +    };
>> +
>> +    qemu_mutex_init(&conn->mutex);
>> +
>> +    return conn;
>> +}
>> +
>> +static void nbd_client_connection_do_free(NBDClientConnection *conn)
>> +{
>> +    if (conn->sioc) {
>> +        qio_channel_close(QIO_CHANNEL(conn->sioc), NULL);
>> +        object_unref(OBJECT(conn->sioc));
>> +    }
>> +    error_free(conn->err);
>> +    qapi_free_SocketAddress(conn->saddr);
>> +    g_free(conn);
>> +}
>> +
>> +static void *connect_thread_func(void *opaque)
>> +{
>> +    NBDClientConnection *conn = opaque;
>> +    bool do_free;
>> +    int ret;
>> +
>> +    conn->sioc = qio_channel_socket_new();
>> +
>> +    error_free(conn->err);
>> +    conn->err = NULL;
>> +    ret = qio_channel_socket_connect_sync(conn->sioc, conn->saddr, &conn->err);
>> +    if (ret < 0) {
>> +        object_unref(OBJECT(conn->sioc));
>> +        conn->sioc = NULL;
>> +    }
>> +
>> +    qemu_mutex_lock(&conn->mutex);
>> +
>> +    assert(conn->running);
>> +    conn->running = false;
>> +    if (conn->wait_co) {
>> +        aio_co_schedule(NULL, conn->wait_co);
>> +        conn->wait_co = NULL;
>> +    }
>> +    do_free = conn->detached;
>> +
>> +    qemu_mutex_unlock(&conn->mutex);
>> +
>> +    if (do_free) {
>> +        nbd_client_connection_do_free(conn);
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>> +void nbd_client_connection_release(NBDClientConnection *conn)
>> +{
>> +    bool do_free;
>> +
>> +    if (!conn) {
>> +        return;
>> +    }
>> +
>> +    qemu_mutex_lock(&conn->mutex);
>> +    if (conn->running) {
>> +        conn->detached = true;
>> +    }
>> +    do_free = !conn->running && !conn->detached;
>> +    qemu_mutex_unlock(&conn->mutex);
>> +
>> +    if (do_free) {
>> +        nbd_client_connection_do_free(conn);
>> +    }
>> +}
>> +
>> +/*
>> + * Get a new connection in context of @conn:
>> + *   if thread is running, wait for completion
>> + *   if thread is already succeeded in background, and user didn't get the
>> + *     result, just return it now
>> + *   otherwise if thread is not running, start a thread and wait for completion
>> + */
>> +QIOChannelSocket *coroutine_fn
>> +nbd_co_establish_connection(NBDClientConnection *conn, Error **errp)
>> +{
>> +    QIOChannelSocket *sioc = NULL;
>> +    QemuThread thread;
>> +
>> +    qemu_mutex_lock(&conn->mutex);
>> +
>> +    /*
>> +     * Don't call nbd_co_establish_connection() in several coroutines in
>> +     * parallel. Only one call at once is supported.
>> +     */
>> +    assert(!conn->wait_co);
>> +
>> +    if (!conn->running) {
>> +        if (conn->sioc) {
>> +            /* Previous attempt finally succeeded in background */
>> +            sioc = g_steal_pointer(&conn->sioc);
>> +            qemu_mutex_unlock(&conn->mutex);
>> +
>> +            return sioc;
>> +        }
>> +
>> +        conn->running = true;
>> +        error_free(conn->err);
>> +        conn->err = NULL;
>> +        qemu_thread_create(&thread, "nbd-connect",
>> +                           connect_thread_func, conn, QEMU_THREAD_DETACHED);
>> +    }
>> +
>> +    conn->wait_co = qemu_coroutine_self();
>> +
>> +    qemu_mutex_unlock(&conn->mutex);
>> +
>> +    /*
>> +     * We are going to wait for connect-thread finish, but
>> +     * nbd_co_establish_connection_cancel() can interrupt.
>> +     */
>> +    qemu_coroutine_yield();
>> +
>> +    qemu_mutex_lock(&conn->mutex);
>> +
>> +    if (conn->running) {
>> +        /*
>> +         * Obviously, drained section wants to start. Report the attempt as
>> +         * failed. Still connect thread is executing in background, and its
>> +         * result may be used for next connection attempt.
>> +         */
>> +        error_setg(errp, "Connection attempt cancelled by other operation");
>> +    } else {
>> +        error_propagate(errp, conn->err);
>> +        conn->err = NULL;
>> +        sioc = g_steal_pointer(&conn->sioc);
>> +    }
>> +
>> +    qemu_mutex_unlock(&conn->mutex);
>> +
>> +    return sioc;
>> +}
>> +
>> +/*
>> + * nbd_co_establish_connection_cancel
>> + * Cancel nbd_co_establish_connection() asynchronously. Note, that it doesn't
>> + * stop the thread itself neither close the socket. It just safely wakes
>> + * nbd_co_establish_connection() sleeping in the yield().
>> + */
>> +void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn)
>> +{
>> +    qemu_mutex_lock(&conn->mutex);
>> +
>> +    if (conn->wait_co) {
>> +        aio_co_schedule(NULL, conn->wait_co);
>> +        conn->wait_co = NULL;
>> +    }
>> +
>> +    qemu_mutex_unlock(&conn->mutex);
>> +}
>> diff --git a/nbd/meson.build b/nbd/meson.build
>> index 2baaa36948..b26d70565e 100644
>> --- a/nbd/meson.build
>> +++ b/nbd/meson.build
>> @@ -1,5 +1,6 @@
>>   block_ss.add(files(
>>     'client.c',
>> +  'client-connection.c',
>>     'common.c',
>>   ))
>>   blockdev_ss.add(files(
>> -- 
>> 2.29.2
>>
diff mbox series

Patch

diff --git a/include/block/nbd.h b/include/block/nbd.h
index 5f34d23bb0..57381be76f 100644
--- a/include/block/nbd.h
+++ b/include/block/nbd.h
@@ -406,4 +406,15 @@  const char *nbd_info_lookup(uint16_t info);
 const char *nbd_cmd_lookup(uint16_t info);
 const char *nbd_err_lookup(int err);
 
+/* nbd/client-connection.c */
+typedef struct NBDClientConnection NBDClientConnection;
+
+NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr);
+void nbd_client_connection_release(NBDClientConnection *conn);
+
+QIOChannelSocket *coroutine_fn
+nbd_co_establish_connection(NBDClientConnection *conn, Error **errp);
+
+void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn);
+
 #endif
diff --git a/block/nbd.c b/block/nbd.c
index 8531d019b2..9bd68dcf10 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -66,24 +66,6 @@  typedef enum NBDClientState {
     NBD_CLIENT_QUIT
 } NBDClientState;
 
-typedef struct NBDClientConnection {
-    /* Initialization constants */
-    SocketAddress *saddr; /* address to connect to */
-
-    /*
-     * Result of last attempt. Valid in FAIL and SUCCESS states.
-     * If you want to steal error, don't forget to set pointer to NULL.
-     */
-    QIOChannelSocket *sioc;
-    Error *err;
-
-    QemuMutex mutex;
-    /* All further fields are protected by mutex */
-    bool running; /* thread is running now */
-    bool detached; /* thread is detached and should cleanup the state */
-    Coroutine *wait_co; /* nbd_co_establish_connection() wait in yield() */
-} NBDClientConnection;
-
 typedef struct BDRVNBDState {
     QIOChannelSocket *sioc; /* The master data channel */
     QIOChannel *ioc; /* The current I/O channel which may differ (eg TLS) */
@@ -118,12 +100,8 @@  typedef struct BDRVNBDState {
     NBDClientConnection *conn;
 } BDRVNBDState;
 
-static void nbd_client_connection_release(NBDClientConnection *conn);
 static int nbd_establish_connection(BlockDriverState *bs, SocketAddress *saddr,
                                     Error **errp);
-static coroutine_fn QIOChannelSocket *
-nbd_co_establish_connection(NBDClientConnection *conn, Error **errp);
-static void nbd_co_establish_connection_cancel(NBDClientConnection *conn);
 static int nbd_client_handshake(BlockDriverState *bs, Error **errp);
 static void nbd_yank(void *opaque);
 
@@ -340,171 +318,6 @@  static bool nbd_client_connecting_wait(BDRVNBDState *s)
     return qatomic_load_acquire(&s->state) == NBD_CLIENT_CONNECTING_WAIT;
 }
 
-static NBDClientConnection *
-nbd_client_connection_new(const SocketAddress *saddr)
-{
-    NBDClientConnection *conn = g_new(NBDClientConnection, 1);
-
-    *conn = (NBDClientConnection) {
-        .saddr = QAPI_CLONE(SocketAddress, saddr),
-    };
-
-    qemu_mutex_init(&conn->mutex);
-
-    return conn;
-}
-
-static void nbd_client_connection_do_free(NBDClientConnection *conn)
-{
-    if (conn->sioc) {
-        qio_channel_close(QIO_CHANNEL(conn->sioc), NULL);
-        object_unref(OBJECT(conn->sioc));
-    }
-    error_free(conn->err);
-    qapi_free_SocketAddress(conn->saddr);
-    g_free(conn);
-}
-
-static void *connect_thread_func(void *opaque)
-{
-    NBDClientConnection *conn = opaque;
-    bool do_free;
-    int ret;
-
-    conn->sioc = qio_channel_socket_new();
-
-    error_free(conn->err);
-    conn->err = NULL;
-    ret = qio_channel_socket_connect_sync(conn->sioc, conn->saddr, &conn->err);
-    if (ret < 0) {
-        object_unref(OBJECT(conn->sioc));
-        conn->sioc = NULL;
-    }
-
-    qemu_mutex_lock(&conn->mutex);
-
-    assert(conn->running);
-    conn->running = false;
-    if (conn->wait_co) {
-        aio_co_schedule(NULL, conn->wait_co);
-        conn->wait_co = NULL;
-    }
-    do_free = conn->detached;
-
-    qemu_mutex_unlock(&conn->mutex);
-
-    if (do_free) {
-        nbd_client_connection_do_free(conn);
-    }
-
-    return NULL;
-}
-
-static void nbd_client_connection_release(NBDClientConnection *conn)
-{
-    bool do_free;
-
-    if (!conn) {
-        return;
-    }
-
-    qemu_mutex_lock(&conn->mutex);
-    if (conn->running) {
-        conn->detached = true;
-    }
-    do_free = !conn->running && !conn->detached;
-    qemu_mutex_unlock(&conn->mutex);
-
-    if (do_free) {
-        nbd_client_connection_do_free(conn);
-    }
-}
-
-/*
- * Get a new connection in context of @conn:
- *   if thread is running, wait for completion
- *   if thread is already succeeded in background, and user didn't get the
- *     result, just return it now
- *   otherwise if thread is not running, start a thread and wait for completion
- */
-static coroutine_fn QIOChannelSocket *
-nbd_co_establish_connection(NBDClientConnection *conn, Error **errp)
-{
-    QIOChannelSocket *sioc = NULL;
-    QemuThread thread;
-
-    qemu_mutex_lock(&conn->mutex);
-
-    /*
-     * Don't call nbd_co_establish_connection() in several coroutines in
-     * parallel. Only one call at once is supported.
-     */
-    assert(!conn->wait_co);
-
-    if (!conn->running) {
-        if (conn->sioc) {
-            /* Previous attempt finally succeeded in background */
-            sioc = g_steal_pointer(&conn->sioc);
-            qemu_mutex_unlock(&conn->mutex);
-
-            return sioc;
-        }
-
-        conn->running = true;
-        error_free(conn->err);
-        conn->err = NULL;
-        qemu_thread_create(&thread, "nbd-connect",
-                           connect_thread_func, conn, QEMU_THREAD_DETACHED);
-    }
-
-    conn->wait_co = qemu_coroutine_self();
-
-    qemu_mutex_unlock(&conn->mutex);
-
-    /*
-     * We are going to wait for connect-thread finish, but
-     * nbd_co_establish_connection_cancel() can interrupt.
-     */
-    qemu_coroutine_yield();
-
-    qemu_mutex_lock(&conn->mutex);
-
-    if (conn->running) {
-        /*
-         * Obviously, drained section wants to start. Report the attempt as
-         * failed. Still connect thread is executing in background, and its
-         * result may be used for next connection attempt.
-         */
-        error_setg(errp, "Connection attempt cancelled by other operation");
-    } else {
-        error_propagate(errp, conn->err);
-        conn->err = NULL;
-        sioc = g_steal_pointer(&conn->sioc);
-    }
-
-    qemu_mutex_unlock(&conn->mutex);
-
-    return sioc;
-}
-
-/*
- * nbd_co_establish_connection_cancel
- * Cancel nbd_co_establish_connection() asynchronously. Note, that it doesn't
- * stop the thread itself neither close the socket. It just safely wakes
- * nbd_co_establish_connection() sleeping in the yield().
- */
-static void nbd_co_establish_connection_cancel(NBDClientConnection *conn)
-{
-    qemu_mutex_lock(&conn->mutex);
-
-    if (conn->wait_co) {
-        aio_co_schedule(NULL, conn->wait_co);
-        conn->wait_co = NULL;
-    }
-
-    qemu_mutex_unlock(&conn->mutex);
-}
-
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
     int ret;
diff --git a/nbd/client-connection.c b/nbd/client-connection.c
new file mode 100644
index 0000000000..4e39a5b1af
--- /dev/null
+++ b/nbd/client-connection.c
@@ -0,0 +1,212 @@ 
+/*
+ * QEMU Block driver for  NBD
+ *
+ * Copyright (c) 2021 Virtuozzo International GmbH.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#include "qemu/osdep.h"
+
+#include "block/nbd.h"
+
+#include "qapi/qapi-visit-sockets.h"
+#include "qapi/clone-visitor.h"
+
+struct NBDClientConnection {
+    /* Initialization constants */
+    SocketAddress *saddr; /* address to connect to */
+
+    /*
+     * Result of last attempt. Valid in FAIL and SUCCESS states.
+     * If you want to steal error, don't forget to set pointer to NULL.
+     */
+    QIOChannelSocket *sioc;
+    Error *err;
+
+    QemuMutex mutex;
+    /* All further fields are protected by mutex */
+    bool running; /* thread is running now */
+    bool detached; /* thread is detached and should cleanup the state */
+    Coroutine *wait_co; /* nbd_co_establish_connection() wait in yield() */
+};
+
+NBDClientConnection *nbd_client_connection_new(const SocketAddress *saddr)
+{
+    NBDClientConnection *conn = g_new(NBDClientConnection, 1);
+
+    *conn = (NBDClientConnection) {
+        .saddr = QAPI_CLONE(SocketAddress, saddr),
+    };
+
+    qemu_mutex_init(&conn->mutex);
+
+    return conn;
+}
+
+static void nbd_client_connection_do_free(NBDClientConnection *conn)
+{
+    if (conn->sioc) {
+        qio_channel_close(QIO_CHANNEL(conn->sioc), NULL);
+        object_unref(OBJECT(conn->sioc));
+    }
+    error_free(conn->err);
+    qapi_free_SocketAddress(conn->saddr);
+    g_free(conn);
+}
+
+static void *connect_thread_func(void *opaque)
+{
+    NBDClientConnection *conn = opaque;
+    bool do_free;
+    int ret;
+
+    conn->sioc = qio_channel_socket_new();
+
+    error_free(conn->err);
+    conn->err = NULL;
+    ret = qio_channel_socket_connect_sync(conn->sioc, conn->saddr, &conn->err);
+    if (ret < 0) {
+        object_unref(OBJECT(conn->sioc));
+        conn->sioc = NULL;
+    }
+
+    qemu_mutex_lock(&conn->mutex);
+
+    assert(conn->running);
+    conn->running = false;
+    if (conn->wait_co) {
+        aio_co_schedule(NULL, conn->wait_co);
+        conn->wait_co = NULL;
+    }
+    do_free = conn->detached;
+
+    qemu_mutex_unlock(&conn->mutex);
+
+    if (do_free) {
+        nbd_client_connection_do_free(conn);
+    }
+
+    return NULL;
+}
+
+void nbd_client_connection_release(NBDClientConnection *conn)
+{
+    bool do_free;
+
+    if (!conn) {
+        return;
+    }
+
+    qemu_mutex_lock(&conn->mutex);
+    if (conn->running) {
+        conn->detached = true;
+    }
+    do_free = !conn->running && !conn->detached;
+    qemu_mutex_unlock(&conn->mutex);
+
+    if (do_free) {
+        nbd_client_connection_do_free(conn);
+    }
+}
+
+/*
+ * Get a new connection in context of @conn:
+ *   if thread is running, wait for completion
+ *   if thread is already succeeded in background, and user didn't get the
+ *     result, just return it now
+ *   otherwise if thread is not running, start a thread and wait for completion
+ */
+QIOChannelSocket *coroutine_fn
+nbd_co_establish_connection(NBDClientConnection *conn, Error **errp)
+{
+    QIOChannelSocket *sioc = NULL;
+    QemuThread thread;
+
+    qemu_mutex_lock(&conn->mutex);
+
+    /*
+     * Don't call nbd_co_establish_connection() in several coroutines in
+     * parallel. Only one call at once is supported.
+     */
+    assert(!conn->wait_co);
+
+    if (!conn->running) {
+        if (conn->sioc) {
+            /* Previous attempt finally succeeded in background */
+            sioc = g_steal_pointer(&conn->sioc);
+            qemu_mutex_unlock(&conn->mutex);
+
+            return sioc;
+        }
+
+        conn->running = true;
+        error_free(conn->err);
+        conn->err = NULL;
+        qemu_thread_create(&thread, "nbd-connect",
+                           connect_thread_func, conn, QEMU_THREAD_DETACHED);
+    }
+
+    conn->wait_co = qemu_coroutine_self();
+
+    qemu_mutex_unlock(&conn->mutex);
+
+    /*
+     * We are going to wait for connect-thread finish, but
+     * nbd_co_establish_connection_cancel() can interrupt.
+     */
+    qemu_coroutine_yield();
+
+    qemu_mutex_lock(&conn->mutex);
+
+    if (conn->running) {
+        /*
+         * Obviously, drained section wants to start. Report the attempt as
+         * failed. Still connect thread is executing in background, and its
+         * result may be used for next connection attempt.
+         */
+        error_setg(errp, "Connection attempt cancelled by other operation");
+    } else {
+        error_propagate(errp, conn->err);
+        conn->err = NULL;
+        sioc = g_steal_pointer(&conn->sioc);
+    }
+
+    qemu_mutex_unlock(&conn->mutex);
+
+    return sioc;
+}
+
+/*
+ * nbd_co_establish_connection_cancel
+ * Cancel nbd_co_establish_connection() asynchronously. Note, that it doesn't
+ * stop the thread itself neither close the socket. It just safely wakes
+ * nbd_co_establish_connection() sleeping in the yield().
+ */
+void coroutine_fn nbd_co_establish_connection_cancel(NBDClientConnection *conn)
+{
+    qemu_mutex_lock(&conn->mutex);
+
+    if (conn->wait_co) {
+        aio_co_schedule(NULL, conn->wait_co);
+        conn->wait_co = NULL;
+    }
+
+    qemu_mutex_unlock(&conn->mutex);
+}
diff --git a/nbd/meson.build b/nbd/meson.build
index 2baaa36948..b26d70565e 100644
--- a/nbd/meson.build
+++ b/nbd/meson.build
@@ -1,5 +1,6 @@ 
 block_ss.add(files(
   'client.c',
+  'client-connection.c',
   'common.c',
 ))
 blockdev_ss.add(files(