diff mbox series

[v14,15/17] net: stream: move to QIO to enable additional parameters

Message ID 20221021090922.170074-16-lvivier@redhat.com (mailing list archive)
State New, archived
Headers show
Series qapi: net: add unix socket type support to netdev backend | expand

Commit Message

Laurent Vivier Oct. 21, 2022, 9:09 a.m. UTC
Use QIOChannel, QIOChannelSocket and QIONetListener.
This allows net/stream to use all the available parameters provided by
SocketAddress.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
Acked-by: Michael S. Tsirkin <mst@redhat.com>
---
 net/stream.c    | 492 +++++++++++++++++-------------------------------
 qemu-options.hx |   4 +-
 2 files changed, 178 insertions(+), 318 deletions(-)

Comments

Philippe Mathieu-Daudé Oct. 21, 2022, 10:05 a.m. UTC | #1
On 21/10/22 11:09, Laurent Vivier wrote:
> Use QIOChannel, QIOChannelSocket and QIONetListener.
> This allows net/stream to use all the available parameters provided by
> SocketAddress.
> 
> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
> Acked-by: Michael S. Tsirkin <mst@redhat.com>
> ---
>   net/stream.c    | 492 +++++++++++++++++-------------------------------
>   qemu-options.hx |   4 +-
>   2 files changed, 178 insertions(+), 318 deletions(-)

> -static void net_stream_accept(void *opaque)
> +static void net_stream_server_listening(QIOTask *task, gpointer opaque)
>   {
>       NetStreamState *s = opaque;
> -    struct sockaddr_storage saddr;
> -    socklen_t len;
> -    int fd;
> -
> -    for (;;) {
> -        len = sizeof(saddr);
> -        fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
> -        if (fd < 0 && errno != EINTR) {
> -            return;
> -        } else if (fd >= 0) {
> -            qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
> -            break;
> -        }
> -    }
> +    QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
> +    SocketAddress *addr;
> +    int ret;
>   
> -    s->fd = fd;
> -    s->nc.link_down = false;
> -    net_stream_connect(s);
> -    switch (saddr.ss_family) {
> -    case AF_INET: {
> -        struct sockaddr_in *saddr_in = (struct sockaddr_in *)&saddr;
> -
> -        qemu_set_info_str(&s->nc, "connection from %s:%d",
> -                          inet_ntoa(saddr_in->sin_addr),
> -                          ntohs(saddr_in->sin_port));
> -        break;
> +    if (listen_sioc->fd < 0) {
> +        qemu_set_info_str(&s->nc, "connection error");
> +        return;
>       }
> -    case AF_UNIX: {
> -        struct sockaddr_un saddr_un;
>   
> -        len = sizeof(saddr_un);
> -        getsockname(s->listen_fd, (struct sockaddr *)&saddr_un, &len);
> -        qemu_set_info_str(&s->nc, "connect from %s", saddr_un.sun_path);
> -        break;
> -    }
> -    default:
> -        g_assert_not_reached();
> +    addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
> +    g_assert(addr != NULL);

Missing propagating Error* (observed in v12).

> +    ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
> +    if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
> +        qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
> +                          addr->u.fd.str, -ret);
> +        return;
>       }
> +    g_assert(ret == 0);
> +    qapi_free_SocketAddress(addr);
> +
> +    s->nc.link_down = true;
> +    s->listener = qio_net_listener_new();
> +
> +    net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
> +    qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
> +    qio_net_listener_add(s->listener, listen_sioc);
>   }
>   
>   static int net_stream_server_init(NetClientState *peer,
> @@ -283,105 +286,61 @@ static int net_stream_server_init(NetClientState *peer,
>   {
>       NetClientState *nc;
>       NetStreamState *s;
> -    int fd, ret;
> +    QIOChannelSocket *listen_sioc = qio_channel_socket_new();
>   
> -    switch (addr->type) {
> -    case SOCKET_ADDRESS_TYPE_INET: {
> -        struct sockaddr_in saddr_in;
> -
> -        if (convert_host_port(&saddr_in, addr->u.inet.host, addr->u.inet.port,
> -                              errp) < 0) {
> -            return -1;
> -        }
> -
> -        fd = qemu_socket(PF_INET, SOCK_STREAM, 0);
> -        if (fd < 0) {
> -            error_setg_errno(errp, errno, "can't create stream socket");
> -            return -1;
> -        }
> -        qemu_socket_set_nonblock(fd);
> +    nc = qemu_new_net_client(&net_stream_info, peer, model, name);
> +    s = DO_UPCAST(NetStreamState, nc, nc);
>   
> -        socket_set_fast_reuse(fd);
> +    s->listen_ioc = QIO_CHANNEL(listen_sioc);
> +    qio_channel_socket_listen_async(listen_sioc, addr, 0,
> +                                    net_stream_server_listening, s,
> +                                    NULL, NULL);
>   
> -        ret = bind(fd, (struct sockaddr *)&saddr_in, sizeof(saddr_in));
> -        if (ret < 0) {
> -            error_setg_errno(errp, errno, "can't bind ip=%s to socket",
> -                             inet_ntoa(saddr_in.sin_addr));
> -            closesocket(fd);
> -            return -1;
> -        }
> -        break;
> -    }
> -    case SOCKET_ADDRESS_TYPE_UNIX: {
> -        struct sockaddr_un saddr_un;
> -
> -        ret = unlink(addr->u.q_unix.path);
> -        if (ret < 0 && errno != ENOENT) {
> -            error_setg_errno(errp, errno, "failed to unlink socket %s",
> -                             addr->u.q_unix.path);
> -            return -1;
> -        }
> +    return 0;
> +}
>   
> -        saddr_un.sun_family = PF_UNIX;
> -        ret = snprintf(saddr_un.sun_path, sizeof(saddr_un.sun_path), "%s",
> -                       addr->u.q_unix.path);
> -        if (ret < 0 || ret >= sizeof(saddr_un.sun_path)) {
> -            error_setg(errp, "UNIX socket path '%s' is too long",
> -                       addr->u.q_unix.path);
> -            error_append_hint(errp, "Path must be less than %zu bytes\n",
> -                              sizeof(saddr_un.sun_path));
> -            return -1;
> -        }
> +static void net_stream_client_connected(QIOTask *task, gpointer opaque)
> +{
> +    NetStreamState *s = opaque;
> +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
> +    SocketAddress *addr;
> +    gchar *uri;
> +    int ret;
>   
> -        fd = qemu_socket(PF_UNIX, SOCK_STREAM, 0);
> -        if (fd < 0) {
> -            error_setg_errno(errp, errno, "can't create stream socket");
> -            return -1;
> -        }
> -        qemu_socket_set_nonblock(fd);
> -
> -        ret = bind(fd, (struct sockaddr *)&saddr_un, sizeof(saddr_un));
> -        if (ret < 0) {
> -            error_setg_errno(errp, errno, "can't create socket with path: %s",
> -                             saddr_un.sun_path);
> -            closesocket(fd);
> -            return -1;
> -        }
> -        break;
> -    }
> -    case SOCKET_ADDRESS_TYPE_FD:
> -        fd = monitor_fd_param(monitor_cur(), addr->u.fd.str, errp);
> -        if (fd == -1) {
> -            return -1;
> -        }
> -        ret = qemu_socket_try_set_nonblock(fd);
> -        if (ret < 0) {
> -            error_setg_errno(errp, -ret, "%s: Can't use file descriptor %d",
> -                             name, fd);
> -            return -1;
> -        }
> -        break;
> -    default:
> -        error_setg(errp, "only support inet or fd type");
> -        return -1;
> +    if (sioc->fd < 0) {
> +        qemu_set_info_str(&s->nc, "connection error");
> +        goto error;
>       }
>   
> -    ret = listen(fd, 0);
> -    if (ret < 0) {
> -        error_setg_errno(errp, errno, "can't listen on socket");
> -        closesocket(fd);
> -        return -1;
> +    addr = qio_channel_socket_get_remote_address(sioc, NULL);
> +    g_assert(addr != NULL);

Ditto (Error*).

> +    uri = socket_uri(addr);
> +    qemu_set_info_str(&s->nc, uri);
> +    g_free(uri);
> +
> +    ret = qemu_socket_try_set_nonblock(sioc->fd);
> +    if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
> +        qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
> +                          addr->u.fd.str, -ret);
> +        qapi_free_SocketAddress(addr);
> +        goto error;
>       }
> +    g_assert(ret == 0);
>   
> -    nc = qemu_new_net_client(&net_stream_info, peer, model, name);
> -    s = DO_UPCAST(NetStreamState, nc, nc);
> -    s->fd = -1;
> -    s->listen_fd = fd;
> -    s->nc.link_down = true;
>       net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
>   
> -    qemu_set_fd_handler(s->listen_fd, net_stream_accept, NULL, s);
> -    return 0;
> +    /* Disable Nagle algorithm on TCP sockets to reduce latency */
> +    qio_channel_set_delay(s->ioc, false);
> +
> +    s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
> +                                            s, NULL);
> +    s->nc.link_down = false;
> +    qapi_free_SocketAddress(addr);
> +
> +    return;
> +error:
> +    object_unref(OBJECT(s->ioc));
> +    s->ioc = NULL;
>   }
Markus Armbruster Oct. 21, 2022, 10:35 a.m. UTC | #2
Philippe Mathieu-Daudé <philmd@linaro.org> writes:

> On 21/10/22 11:09, Laurent Vivier wrote:
>> Use QIOChannel, QIOChannelSocket and QIONetListener.
>> This allows net/stream to use all the available parameters provided by
>> SocketAddress.
>> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
>> Acked-by: Michael S. Tsirkin <mst@redhat.com>
>> ---
>>   net/stream.c    | 492 +++++++++++++++++-------------------------------
>>   qemu-options.hx |   4 +-
>>   2 files changed, 178 insertions(+), 318 deletions(-)
>
>> -static void net_stream_accept(void *opaque)
>> +static void net_stream_server_listening(QIOTask *task, gpointer opaque)
>>   {
>>       NetStreamState *s = opaque;
>> -    struct sockaddr_storage saddr;
>> -    socklen_t len;
>> -    int fd;
>> -
>> -    for (;;) {
>> -        len = sizeof(saddr);
>> -        fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
>> -        if (fd < 0 && errno != EINTR) {
>> -            return;
>> -        } else if (fd >= 0) {
>> -            qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
>> -            break;
>> -        }
>> -    }
>> +    QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
>> +    SocketAddress *addr;
>> +    int ret;
>>   -    s->fd = fd;
>> -    s->nc.link_down = false;
>> -    net_stream_connect(s);
>> -    switch (saddr.ss_family) {
>> -    case AF_INET: {
>> -        struct sockaddr_in *saddr_in = (struct sockaddr_in *)&saddr;
>> -
>> -        qemu_set_info_str(&s->nc, "connection from %s:%d",
>> -                          inet_ntoa(saddr_in->sin_addr),
>> -                          ntohs(saddr_in->sin_port));
>> -        break;
>> +    if (listen_sioc->fd < 0) {
>> +        qemu_set_info_str(&s->nc, "connection error");
>> +        return;
>>       }
>> -    case AF_UNIX: {
>> -        struct sockaddr_un saddr_un;
>>   -        len = sizeof(saddr_un);
>> -        getsockname(s->listen_fd, (struct sockaddr *)&saddr_un, &len);
>> -        qemu_set_info_str(&s->nc, "connect from %s", saddr_un.sun_path);
>> -        break;
>> -    }
>> -    default:
>> -        g_assert_not_reached();
>> +    addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
>> +    g_assert(addr != NULL);
>
> Missing propagating Error* (observed in v12).

*If* this is really a programming error: what about &error_abort?

[...]
Laurent Vivier Oct. 21, 2022, 10:43 a.m. UTC | #3
On 10/21/22 12:35, Markus Armbruster wrote:
> Philippe Mathieu-Daudé <philmd@linaro.org> writes:
> 
>> On 21/10/22 11:09, Laurent Vivier wrote:
>>> Use QIOChannel, QIOChannelSocket and QIONetListener.
>>> This allows net/stream to use all the available parameters provided by
>>> SocketAddress.
>>> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
>>> Acked-by: Michael S. Tsirkin <mst@redhat.com>
>>> ---
>>>    net/stream.c    | 492 +++++++++++++++++-------------------------------
>>>    qemu-options.hx |   4 +-
>>>    2 files changed, 178 insertions(+), 318 deletions(-)
>>
>>> -static void net_stream_accept(void *opaque)
>>> +static void net_stream_server_listening(QIOTask *task, gpointer opaque)
>>>    {
>>>        NetStreamState *s = opaque;
>>> -    struct sockaddr_storage saddr;
>>> -    socklen_t len;
>>> -    int fd;
>>> -
>>> -    for (;;) {
>>> -        len = sizeof(saddr);
>>> -        fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
>>> -        if (fd < 0 && errno != EINTR) {
>>> -            return;
>>> -        } else if (fd >= 0) {
>>> -            qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
>>> -            break;
>>> -        }
>>> -    }
>>> +    QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
>>> +    SocketAddress *addr;
>>> +    int ret;
>>>    -    s->fd = fd;
>>> -    s->nc.link_down = false;
>>> -    net_stream_connect(s);
>>> -    switch (saddr.ss_family) {
>>> -    case AF_INET: {
>>> -        struct sockaddr_in *saddr_in = (struct sockaddr_in *)&saddr;
>>> -
>>> -        qemu_set_info_str(&s->nc, "connection from %s:%d",
>>> -                          inet_ntoa(saddr_in->sin_addr),
>>> -                          ntohs(saddr_in->sin_port));
>>> -        break;
>>> +    if (listen_sioc->fd < 0) {
>>> +        qemu_set_info_str(&s->nc, "connection error");
>>> +        return;
>>>        }
>>> -    case AF_UNIX: {
>>> -        struct sockaddr_un saddr_un;
>>>    -        len = sizeof(saddr_un);
>>> -        getsockname(s->listen_fd, (struct sockaddr *)&saddr_un, &len);
>>> -        qemu_set_info_str(&s->nc, "connect from %s", saddr_un.sun_path);
>>> -        break;
>>> -    }
>>> -    default:
>>> -        g_assert_not_reached();
>>> +    addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
>>> +    g_assert(addr != NULL);
>>
>> Missing propagating Error* (observed in v12).
> 
> *If* this is really a programming error: what about &error_abort?

assert() informs the compiler that following code will not use addr with a NULL value, I 
don't think &error_abort does that. This could avoid an error report in code static analyzer.

Thanks,
Laurent
Markus Armbruster Oct. 21, 2022, 11:31 a.m. UTC | #4
Laurent Vivier <lvivier@redhat.com> writes:

> On 10/21/22 12:35, Markus Armbruster wrote:
>> Philippe Mathieu-Daudé <philmd@linaro.org> writes:
>> 
>>> On 21/10/22 11:09, Laurent Vivier wrote:
>>>> Use QIOChannel, QIOChannelSocket and QIONetListener.
>>>> This allows net/stream to use all the available parameters provided by
>>>> SocketAddress.
>>>> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
>>>> Acked-by: Michael S. Tsirkin <mst@redhat.com>
>>>> ---
>>>>    net/stream.c    | 492 +++++++++++++++++-------------------------------
>>>>    qemu-options.hx |   4 +-
>>>>    2 files changed, 178 insertions(+), 318 deletions(-)
>>>
>>>> -static void net_stream_accept(void *opaque)
>>>> +static void net_stream_server_listening(QIOTask *task, gpointer opaque)
>>>>    {
>>>>        NetStreamState *s = opaque;
>>>> -    struct sockaddr_storage saddr;
>>>> -    socklen_t len;
>>>> -    int fd;
>>>> -
>>>> -    for (;;) {
>>>> -        len = sizeof(saddr);
>>>> -        fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
>>>> -        if (fd < 0 && errno != EINTR) {
>>>> -            return;
>>>> -        } else if (fd >= 0) {
>>>> -            qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
>>>> -            break;
>>>> -        }
>>>> -    }
>>>> +    QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
>>>> +    SocketAddress *addr;
>>>> +    int ret;
>>>> -    s->fd = fd;
>>>> -    s->nc.link_down = false;
>>>> -    net_stream_connect(s);
>>>> -    switch (saddr.ss_family) {
>>>> -    case AF_INET: {
>>>> -        struct sockaddr_in *saddr_in = (struct sockaddr_in *)&saddr;
>>>> -
>>>> -        qemu_set_info_str(&s->nc, "connection from %s:%d",
>>>> -                          inet_ntoa(saddr_in->sin_addr),
>>>> -                          ntohs(saddr_in->sin_port));
>>>> -        break;
>>>> +    if (listen_sioc->fd < 0) {
>>>> +        qemu_set_info_str(&s->nc, "connection error");
>>>> +        return;
>>>>        }
>>>> -    case AF_UNIX: {
>>>> -        struct sockaddr_un saddr_un;
>>>> -        len = sizeof(saddr_un);
>>>> -        getsockname(s->listen_fd, (struct sockaddr *)&saddr_un, &len);
>>>> -        qemu_set_info_str(&s->nc, "connect from %s", saddr_un.sun_path);
>>>> -        break;
>>>> -    }
>>>> -    default:
>>>> -        g_assert_not_reached();
>>>> +    addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
>>>> +    g_assert(addr != NULL);
>>>
>>> Missing propagating Error* (observed in v12).
>> 
>> *If* this is really a programming error: what about &error_abort?
>
> assert() informs the compiler that following code will not use addr with a NULL value, I 
> don't think &error_abort does that. This could avoid an error report in code static analyzer.

I'd expect Coverity to see right through it.

Static analyzers with a less global view won't, of course.

For what it's worth, there are about a thousand uses of &error_abort
outside tests/.  I'm not aware of them confusing static analyzers we
care about.

I like &error_abort, because it makes the program crash when we try to
put the error into &error_abort, with an informative message.  This is
often right where things go wrong[*].  I personally don't care much
about the better message, but others do.  The better stack backtrace has
been quite useful to me.

Let's use &error_abort, and throw in the assert when a static analyzer
we care about needs it.


[*] error_propagate() messes this up.  That's why the comments in
error.h ask you to do without when practical.
Philippe Mathieu-Daudé Oct. 21, 2022, 1:20 p.m. UTC | #5
On 21/10/22 13:31, Markus Armbruster wrote:
> Laurent Vivier <lvivier@redhat.com> writes:
> 
>> On 10/21/22 12:35, Markus Armbruster wrote:
>>> Philippe Mathieu-Daudé <philmd@linaro.org> writes:
>>>
>>>> On 21/10/22 11:09, Laurent Vivier wrote:
>>>>> Use QIOChannel, QIOChannelSocket and QIONetListener.
>>>>> This allows net/stream to use all the available parameters provided by
>>>>> SocketAddress.
>>>>> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
>>>>> Acked-by: Michael S. Tsirkin <mst@redhat.com>
>>>>> ---
>>>>>     net/stream.c    | 492 +++++++++++++++++-------------------------------
>>>>>     qemu-options.hx |   4 +-
>>>>>     2 files changed, 178 insertions(+), 318 deletions(-)

>>>>> +    addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
>>>>> +    g_assert(addr != NULL);
>>>>
>>>> Missing propagating Error* (observed in v12).
>>>
>>> *If* this is really a programming error: what about &error_abort?
>>
>> assert() informs the compiler that following code will not use addr with a NULL value, I
>> don't think &error_abort does that. This could avoid an error report in code static analyzer.
> 
> I'd expect Coverity to see right through it.
> 
> Static analyzers with a less global view won't, of course.
> 
> For what it's worth, there are about a thousand uses of &error_abort
> outside tests/.  I'm not aware of them confusing static analyzers we
> care about.
> 
> I like &error_abort, because it makes the program crash when we try to
> put the error into &error_abort, with an informative message.  This is
> often right where things go wrong[*].  I personally don't care much
> about the better message, but others do.  The better stack backtrace has
> been quite useful to me.

I concur:

   qemu-system-x86_64: socket family 0 unsupported

VS:

    ERROR:../../net/stream.c:321:net_stream_client_connected: assertion
failed: (addr != NULL)

https://lore.kernel.org/qemu-devel/6fa6b9e5-fede-0f68-752f-0c0d8fa3494f@linaro.org/

> 
> Let's use &error_abort, and throw in the assert when a static analyzer
> we care about needs it.
> 
> 
> [*] error_propagate() messes this up.  That's why the comments in
> error.h ask you to do without when practical.
> 
>
diff mbox series

Patch

diff --git a/net/stream.c b/net/stream.c
index 884f473018da..54c67e14d240 100644
--- a/net/stream.c
+++ b/net/stream.c
@@ -35,48 +35,36 @@ 
 #include "qemu/iov.h"
 #include "qemu/main-loop.h"
 #include "qemu/cutils.h"
+#include "io/channel.h"
+#include "io/channel-socket.h"
+#include "io/net-listener.h"
 
 typedef struct NetStreamState {
     NetClientState nc;
-    int listen_fd;
-    int fd;
+    QIOChannel *listen_ioc;
+    QIONetListener *listener;
+    QIOChannel *ioc;
+    guint ioc_read_tag;
+    guint ioc_write_tag;
     SocketReadState rs;
     unsigned int send_index;      /* number of bytes sent*/
-    bool read_poll;               /* waiting to receive data? */
-    bool write_poll;              /* waiting to transmit data? */
 } NetStreamState;
 
-static void net_stream_send(void *opaque);
-static void net_stream_accept(void *opaque);
-static void net_stream_writable(void *opaque);
+static void net_stream_listen(QIONetListener *listener,
+                              QIOChannelSocket *cioc,
+                              void *opaque);
 
-static void net_stream_update_fd_handler(NetStreamState *s)
+static gboolean net_stream_writable(QIOChannel *ioc,
+                                    GIOCondition condition,
+                                    gpointer data)
 {
-    qemu_set_fd_handler(s->fd,
-                        s->read_poll ? net_stream_send : NULL,
-                        s->write_poll ? net_stream_writable : NULL,
-                        s);
-}
-
-static void net_stream_read_poll(NetStreamState *s, bool enable)
-{
-    s->read_poll = enable;
-    net_stream_update_fd_handler(s);
-}
-
-static void net_stream_write_poll(NetStreamState *s, bool enable)
-{
-    s->write_poll = enable;
-    net_stream_update_fd_handler(s);
-}
-
-static void net_stream_writable(void *opaque)
-{
-    NetStreamState *s = opaque;
+    NetStreamState *s = data;
 
-    net_stream_write_poll(s, false);
+    s->ioc_write_tag = 0;
 
     qemu_flush_queued_packets(&s->nc);
+
+    return G_SOURCE_REMOVE;
 }
 
 static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
@@ -93,13 +81,15 @@  static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
             .iov_len  = size,
         },
     };
+    struct iovec local_iov[2];
+    unsigned int nlocal_iov;
     size_t remaining;
     ssize_t ret;
 
     remaining = iov_size(iov, 2) - s->send_index;
-    ret = iov_send(s->fd, iov, 2, s->send_index, remaining);
-
-    if (ret == -1 && errno == EAGAIN) {
+    nlocal_iov = iov_copy(local_iov, 2, iov, 2, s->send_index, remaining);
+    ret = qio_channel_writev(s->ioc, local_iov, nlocal_iov, NULL);
+    if (ret == QIO_CHANNEL_ERR_BLOCK) {
         ret = 0; /* handled further down */
     }
     if (ret == -1) {
@@ -108,19 +98,25 @@  static ssize_t net_stream_receive(NetClientState *nc, const uint8_t *buf,
     }
     if (ret < (ssize_t)remaining) {
         s->send_index += ret;
-        net_stream_write_poll(s, true);
+        s->ioc_write_tag = qio_channel_add_watch(s->ioc, G_IO_OUT,
+                                                 net_stream_writable, s, NULL);
         return 0;
     }
     s->send_index = 0;
     return size;
 }
 
+static gboolean net_stream_send(QIOChannel *ioc,
+                                GIOCondition condition,
+                                gpointer data);
+
 static void net_stream_send_completed(NetClientState *nc, ssize_t len)
 {
     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
 
-    if (!s->read_poll) {
-        net_stream_read_poll(s, true);
+    if (!s->ioc_read_tag) {
+        s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN,
+                                                net_stream_send, s, NULL);
     }
 }
 
@@ -131,19 +127,24 @@  static void net_stream_rs_finalize(SocketReadState *rs)
     if (qemu_send_packet_async(&s->nc, rs->buf,
                                rs->packet_len,
                                net_stream_send_completed) == 0) {
-        net_stream_read_poll(s, false);
+        if (s->ioc_read_tag) {
+            g_source_remove(s->ioc_read_tag);
+            s->ioc_read_tag = 0;
+        }
     }
 }
 
-static void net_stream_send(void *opaque)
+static gboolean net_stream_send(QIOChannel *ioc,
+                                GIOCondition condition,
+                                gpointer data)
 {
-    NetStreamState *s = opaque;
+    NetStreamState *s = data;
     int size;
     int ret;
-    uint8_t buf1[NET_BUFSIZE];
-    const uint8_t *buf;
+    char buf1[NET_BUFSIZE];
+    const char *buf;
 
-    size = recv(s->fd, buf1, sizeof(buf1), 0);
+    size = qio_channel_read(s->ioc, buf1, sizeof(buf1), NULL);
     if (size < 0) {
         if (errno != EWOULDBLOCK) {
             goto eoc;
@@ -151,51 +152,63 @@  static void net_stream_send(void *opaque)
     } else if (size == 0) {
         /* end of connection */
     eoc:
-        net_stream_read_poll(s, false);
-        net_stream_write_poll(s, false);
-        if (s->listen_fd != -1) {
-            qemu_set_fd_handler(s->listen_fd, net_stream_accept, NULL, s);
+        s->ioc_read_tag = 0;
+        if (s->ioc_write_tag) {
+            g_source_remove(s->ioc_write_tag);
+            s->ioc_write_tag = 0;
         }
-        closesocket(s->fd);
+        if (s->listener) {
+            qio_net_listener_set_client_func(s->listener, net_stream_listen,
+                                             s, NULL);
+        }
+        object_unref(OBJECT(s->ioc));
+        s->ioc = NULL;
 
-        s->fd = -1;
         net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
         s->nc.link_down = true;
         qemu_set_info_str(&s->nc, "");
 
-        return;
+        return G_SOURCE_REMOVE;
     }
     buf = buf1;
 
-    ret = net_fill_rstate(&s->rs, buf, size);
+    ret = net_fill_rstate(&s->rs, (const uint8_t *)buf, size);
 
     if (ret == -1) {
         goto eoc;
     }
+
+    return G_SOURCE_CONTINUE;
 }
 
 static void net_stream_cleanup(NetClientState *nc)
 {
     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
-    if (s->fd != -1) {
-        net_stream_read_poll(s, false);
-        net_stream_write_poll(s, false);
-        close(s->fd);
-        s->fd = -1;
+    if (s->ioc) {
+        if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
+            if (s->ioc_read_tag) {
+                g_source_remove(s->ioc_read_tag);
+                s->ioc_read_tag = 0;
+            }
+            if (s->ioc_write_tag) {
+                g_source_remove(s->ioc_write_tag);
+                s->ioc_write_tag = 0;
+            }
+        }
+        object_unref(OBJECT(s->ioc));
+        s->ioc = NULL;
     }
-    if (s->listen_fd != -1) {
-        qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
-        closesocket(s->listen_fd);
-        s->listen_fd = -1;
+    if (s->listen_ioc) {
+        if (s->listener) {
+            qio_net_listener_disconnect(s->listener);
+            object_unref(OBJECT(s->listener));
+            s->listener = NULL;
+        }
+        object_unref(OBJECT(s->listen_ioc));
+        s->listen_ioc = NULL;
     }
 }
 
-static void net_stream_connect(void *opaque)
-{
-    NetStreamState *s = opaque;
-    net_stream_read_poll(s, true);
-}
-
 static NetClientInfo net_stream_info = {
     .type = NET_CLIENT_DRIVER_STREAM,
     .size = sizeof(NetStreamState),
@@ -203,76 +216,66 @@  static NetClientInfo net_stream_info = {
     .cleanup = net_stream_cleanup,
 };
 
-static NetStreamState *net_stream_fd_init(NetClientState *peer,
-                                          const char *model,
-                                          const char *name,
-                                          int fd, int is_connected)
+static void net_stream_listen(QIONetListener *listener,
+                              QIOChannelSocket *cioc,
+                              void *opaque)
 {
-    NetClientState *nc;
-    NetStreamState *s;
-
-    nc = qemu_new_net_client(&net_stream_info, peer, model, name);
+    NetStreamState *s = opaque;
+    SocketAddress *addr;
+    char *uri;
 
-    qemu_set_info_str(nc, "fd=%d", fd);
+    object_ref(OBJECT(cioc));
 
-    s = DO_UPCAST(NetStreamState, nc, nc);
+    qio_net_listener_set_client_func(s->listener, NULL, s, NULL);
 
-    s->fd = fd;
-    s->listen_fd = -1;
-    net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
+    s->ioc = QIO_CHANNEL(cioc);
+    qio_channel_set_name(s->ioc, "stream-server");
+    s->nc.link_down = false;
 
-    /* Disable Nagle algorithm on TCP sockets to reduce latency */
-    socket_set_nodelay(fd);
+    s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
+                                            s, NULL);
 
-    if (is_connected) {
-        net_stream_connect(s);
+    if (cioc->localAddr.ss_family == AF_UNIX) {
+        addr = qio_channel_socket_get_local_address(cioc, NULL);
     } else {
-        qemu_set_fd_handler(s->fd, NULL, net_stream_connect, s);
+        addr = qio_channel_socket_get_remote_address(cioc, NULL);
     }
-    return s;
+    g_assert(addr != NULL);
+    uri = socket_uri(addr);
+    qemu_set_info_str(&s->nc, uri);
+    g_free(uri);
+    qapi_free_SocketAddress(addr);
 }
 
-static void net_stream_accept(void *opaque)
+static void net_stream_server_listening(QIOTask *task, gpointer opaque)
 {
     NetStreamState *s = opaque;
-    struct sockaddr_storage saddr;
-    socklen_t len;
-    int fd;
-
-    for (;;) {
-        len = sizeof(saddr);
-        fd = qemu_accept(s->listen_fd, (struct sockaddr *)&saddr, &len);
-        if (fd < 0 && errno != EINTR) {
-            return;
-        } else if (fd >= 0) {
-            qemu_set_fd_handler(s->listen_fd, NULL, NULL, NULL);
-            break;
-        }
-    }
+    QIOChannelSocket *listen_sioc = QIO_CHANNEL_SOCKET(s->listen_ioc);
+    SocketAddress *addr;
+    int ret;
 
-    s->fd = fd;
-    s->nc.link_down = false;
-    net_stream_connect(s);
-    switch (saddr.ss_family) {
-    case AF_INET: {
-        struct sockaddr_in *saddr_in = (struct sockaddr_in *)&saddr;
-
-        qemu_set_info_str(&s->nc, "connection from %s:%d",
-                          inet_ntoa(saddr_in->sin_addr),
-                          ntohs(saddr_in->sin_port));
-        break;
+    if (listen_sioc->fd < 0) {
+        qemu_set_info_str(&s->nc, "connection error");
+        return;
     }
-    case AF_UNIX: {
-        struct sockaddr_un saddr_un;
 
-        len = sizeof(saddr_un);
-        getsockname(s->listen_fd, (struct sockaddr *)&saddr_un, &len);
-        qemu_set_info_str(&s->nc, "connect from %s", saddr_un.sun_path);
-        break;
-    }
-    default:
-        g_assert_not_reached();
+    addr = qio_channel_socket_get_local_address(listen_sioc, NULL);
+    g_assert(addr != NULL);
+    ret = qemu_socket_try_set_nonblock(listen_sioc->fd);
+    if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
+        qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
+                          addr->u.fd.str, -ret);
+        return;
     }
+    g_assert(ret == 0);
+    qapi_free_SocketAddress(addr);
+
+    s->nc.link_down = true;
+    s->listener = qio_net_listener_new();
+
+    net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
+    qio_net_listener_set_client_func(s->listener, net_stream_listen, s, NULL);
+    qio_net_listener_add(s->listener, listen_sioc);
 }
 
 static int net_stream_server_init(NetClientState *peer,
@@ -283,105 +286,61 @@  static int net_stream_server_init(NetClientState *peer,
 {
     NetClientState *nc;
     NetStreamState *s;
-    int fd, ret;
+    QIOChannelSocket *listen_sioc = qio_channel_socket_new();
 
-    switch (addr->type) {
-    case SOCKET_ADDRESS_TYPE_INET: {
-        struct sockaddr_in saddr_in;
-
-        if (convert_host_port(&saddr_in, addr->u.inet.host, addr->u.inet.port,
-                              errp) < 0) {
-            return -1;
-        }
-
-        fd = qemu_socket(PF_INET, SOCK_STREAM, 0);
-        if (fd < 0) {
-            error_setg_errno(errp, errno, "can't create stream socket");
-            return -1;
-        }
-        qemu_socket_set_nonblock(fd);
+    nc = qemu_new_net_client(&net_stream_info, peer, model, name);
+    s = DO_UPCAST(NetStreamState, nc, nc);
 
-        socket_set_fast_reuse(fd);
+    s->listen_ioc = QIO_CHANNEL(listen_sioc);
+    qio_channel_socket_listen_async(listen_sioc, addr, 0,
+                                    net_stream_server_listening, s,
+                                    NULL, NULL);
 
-        ret = bind(fd, (struct sockaddr *)&saddr_in, sizeof(saddr_in));
-        if (ret < 0) {
-            error_setg_errno(errp, errno, "can't bind ip=%s to socket",
-                             inet_ntoa(saddr_in.sin_addr));
-            closesocket(fd);
-            return -1;
-        }
-        break;
-    }
-    case SOCKET_ADDRESS_TYPE_UNIX: {
-        struct sockaddr_un saddr_un;
-
-        ret = unlink(addr->u.q_unix.path);
-        if (ret < 0 && errno != ENOENT) {
-            error_setg_errno(errp, errno, "failed to unlink socket %s",
-                             addr->u.q_unix.path);
-            return -1;
-        }
+    return 0;
+}
 
-        saddr_un.sun_family = PF_UNIX;
-        ret = snprintf(saddr_un.sun_path, sizeof(saddr_un.sun_path), "%s",
-                       addr->u.q_unix.path);
-        if (ret < 0 || ret >= sizeof(saddr_un.sun_path)) {
-            error_setg(errp, "UNIX socket path '%s' is too long",
-                       addr->u.q_unix.path);
-            error_append_hint(errp, "Path must be less than %zu bytes\n",
-                              sizeof(saddr_un.sun_path));
-            return -1;
-        }
+static void net_stream_client_connected(QIOTask *task, gpointer opaque)
+{
+    NetStreamState *s = opaque;
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(s->ioc);
+    SocketAddress *addr;
+    gchar *uri;
+    int ret;
 
-        fd = qemu_socket(PF_UNIX, SOCK_STREAM, 0);
-        if (fd < 0) {
-            error_setg_errno(errp, errno, "can't create stream socket");
-            return -1;
-        }
-        qemu_socket_set_nonblock(fd);
-
-        ret = bind(fd, (struct sockaddr *)&saddr_un, sizeof(saddr_un));
-        if (ret < 0) {
-            error_setg_errno(errp, errno, "can't create socket with path: %s",
-                             saddr_un.sun_path);
-            closesocket(fd);
-            return -1;
-        }
-        break;
-    }
-    case SOCKET_ADDRESS_TYPE_FD:
-        fd = monitor_fd_param(monitor_cur(), addr->u.fd.str, errp);
-        if (fd == -1) {
-            return -1;
-        }
-        ret = qemu_socket_try_set_nonblock(fd);
-        if (ret < 0) {
-            error_setg_errno(errp, -ret, "%s: Can't use file descriptor %d",
-                             name, fd);
-            return -1;
-        }
-        break;
-    default:
-        error_setg(errp, "only support inet or fd type");
-        return -1;
+    if (sioc->fd < 0) {
+        qemu_set_info_str(&s->nc, "connection error");
+        goto error;
     }
 
-    ret = listen(fd, 0);
-    if (ret < 0) {
-        error_setg_errno(errp, errno, "can't listen on socket");
-        closesocket(fd);
-        return -1;
+    addr = qio_channel_socket_get_remote_address(sioc, NULL);
+    g_assert(addr != NULL);
+    uri = socket_uri(addr);
+    qemu_set_info_str(&s->nc, uri);
+    g_free(uri);
+
+    ret = qemu_socket_try_set_nonblock(sioc->fd);
+    if (addr->type == SOCKET_ADDRESS_TYPE_FD && ret < 0) {
+        qemu_set_info_str(&s->nc, "can't use file descriptor %s (errno %d)",
+                          addr->u.fd.str, -ret);
+        qapi_free_SocketAddress(addr);
+        goto error;
     }
+    g_assert(ret == 0);
 
-    nc = qemu_new_net_client(&net_stream_info, peer, model, name);
-    s = DO_UPCAST(NetStreamState, nc, nc);
-    s->fd = -1;
-    s->listen_fd = fd;
-    s->nc.link_down = true;
     net_socket_rs_init(&s->rs, net_stream_rs_finalize, false);
 
-    qemu_set_fd_handler(s->listen_fd, net_stream_accept, NULL, s);
-    return 0;
+    /* Disable Nagle algorithm on TCP sockets to reduce latency */
+    qio_channel_set_delay(s->ioc, false);
+
+    s->ioc_read_tag = qio_channel_add_watch(s->ioc, G_IO_IN, net_stream_send,
+                                            s, NULL);
+    s->nc.link_down = false;
+    qapi_free_SocketAddress(addr);
+
+    return;
+error:
+    object_unref(OBJECT(s->ioc));
+    s->ioc = NULL;
 }
 
 static int net_stream_client_init(NetClientState *peer,
@@ -391,118 +350,19 @@  static int net_stream_client_init(NetClientState *peer,
                                   Error **errp)
 {
     NetStreamState *s;
-    struct sockaddr_in saddr_in;
-    struct sockaddr_un saddr_un;
-    int fd, connected, ret;
-
-    switch (addr->type) {
-    case SOCKET_ADDRESS_TYPE_INET:
-        if (convert_host_port(&saddr_in, addr->u.inet.host, addr->u.inet.port,
-                              errp) < 0) {
-            return -1;
-        }
+    NetClientState *nc;
+    QIOChannelSocket *sioc = qio_channel_socket_new();
 
-        fd = qemu_socket(PF_INET, SOCK_STREAM, 0);
-        if (fd < 0) {
-            error_setg_errno(errp, errno, "can't create stream socket");
-            return -1;
-        }
-        qemu_socket_set_nonblock(fd);
-
-        connected = 0;
-        for (;;) {
-            ret = connect(fd, (struct sockaddr *)&saddr_in, sizeof(saddr_in));
-            if (ret < 0) {
-                if (errno == EINTR || errno == EWOULDBLOCK) {
-                    /* continue */
-                } else if (errno == EINPROGRESS ||
-                           errno == EALREADY) {
-                    break;
-                } else {
-                    error_setg_errno(errp, errno, "can't connect socket");
-                    closesocket(fd);
-                    return -1;
-                }
-            } else {
-                connected = 1;
-                break;
-            }
-        }
-        break;
-    case SOCKET_ADDRESS_TYPE_UNIX:
-        saddr_un.sun_family = PF_UNIX;
-        ret = snprintf(saddr_un.sun_path, sizeof(saddr_un.sun_path), "%s",
-                       addr->u.q_unix.path);
-        if (ret < 0 || ret >= sizeof(saddr_un.sun_path)) {
-            error_setg(errp, "UNIX socket path '%s' is too long",
-                       addr->u.q_unix.path);
-            error_append_hint(errp, "Path must be less than %zu bytes\n",
-                              sizeof(saddr_un.sun_path));
-            return -1;
-        }
+    nc = qemu_new_net_client(&net_stream_info, peer, model, name);
+    s = DO_UPCAST(NetStreamState, nc, nc);
 
-        fd = qemu_socket(PF_UNIX, SOCK_STREAM, 0);
-        if (fd < 0) {
-            error_setg_errno(errp, errno, "can't create stream socket");
-            return -1;
-        }
-        qemu_socket_set_nonblock(fd);
-
-        connected = 0;
-        for (;;) {
-            ret = connect(fd, (struct sockaddr *)&saddr_un, sizeof(saddr_un));
-            if (ret < 0) {
-                if (errno == EINTR || errno == EWOULDBLOCK) {
-                    /* continue */
-                } else if (errno == EAGAIN ||
-                           errno == EALREADY) {
-                    break;
-                } else {
-                    error_setg_errno(errp, errno, "can't connect socket");
-                    closesocket(fd);
-                    return -1;
-                }
-            } else {
-                connected = 1;
-                break;
-            }
-        }
-        break;
-    case SOCKET_ADDRESS_TYPE_FD:
-        fd = monitor_fd_param(monitor_cur(), addr->u.fd.str, errp);
-        if (fd == -1) {
-            return -1;
-        }
-        ret = qemu_socket_try_set_nonblock(fd);
-        if (ret < 0) {
-            error_setg_errno(errp, -ret, "%s: Can't use file descriptor %d",
-                             name, fd);
-            return -1;
-        }
-        connected = 1;
-        break;
-    default:
-        error_setg(errp, "only support inet, unix or fd type");
-        return -1;
-    }
+    s->ioc = QIO_CHANNEL(sioc);
+    s->nc.link_down = true;
+
+    qio_channel_socket_connect_async(sioc, addr,
+                                     net_stream_client_connected, s,
+                                     NULL, NULL);
 
-    s = net_stream_fd_init(peer, model, name, fd, connected);
-
-    switch (addr->type) {
-    case SOCKET_ADDRESS_TYPE_INET:
-        qemu_set_info_str(&s->nc, "connect to %s:%d",
-                          inet_ntoa(saddr_in.sin_addr),
-                          ntohs(saddr_in.sin_port));
-        break;
-    case SOCKET_ADDRESS_TYPE_UNIX:
-        qemu_set_info_str(&s->nc, " connect to %s", saddr_un.sun_path);
-        break;
-    case SOCKET_ADDRESS_TYPE_FD:
-        qemu_set_info_str(&s->nc, "connect to fd %d", fd);
-        break;
-    default:
-        g_assert_not_reached();
-    }
     return 0;
 }
 
diff --git a/qemu-options.hx b/qemu-options.hx
index fafb214cb89f..a91f96a264cc 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -2772,8 +2772,8 @@  DEF("netdev", HAS_ARG, QEMU_OPTION_netdev,
     "-netdev socket,id=str[,fd=h][,udp=host:port][,localaddr=host:port]\n"
     "                configure a network backend to connect to another network\n"
     "                using an UDP tunnel\n"
-    "-netdev stream,id=str[,server=on|off],addr.type=inet,addr.host=host,addr.port=port\n"
-    "-netdev stream,id=str[,server=on|off],addr.type=unix,addr.path=path\n"
+    "-netdev stream,id=str[,server=on|off],addr.type=inet,addr.host=host,addr.port=port[,to=maxport][,numeric=on|off][,keep-alive=on|off][,mptcp=on|off][,addr.ipv4=on|off][,addr.ipv6=on|off]\n"
+    "-netdev stream,id=str[,server=on|off],addr.type=unix,addr.path=path[,abstract=on|off][,tight=on|off]\n"
     "-netdev stream,id=str[,server=on|off],addr.type=fd,addr.str=file-descriptor\n"
     "                configure a network backend to connect to another network\n"
     "                using a socket connection in stream mode.\n"