diff mbox series

net: stream: add a new option to automatically reconnect

Message ID 20221110073400.968475-1-lvivier@redhat.com (mailing list archive)
State New, archived
Headers show
Series net: stream: add a new option to automatically reconnect | expand

Commit Message

Laurent Vivier Nov. 10, 2022, 7:34 a.m. UTC
In stream mode, if the server shuts down there is currently
no way to reconnect the client to a new server without removing
the NIC device and the netdev backend (or to reboot).

This patch introduces a reconnect option that specifies a delay
to try to reconnect with the same parameters.

Add a new test in qtest to test the reconnect option and the
connect/disconnect events.

Signed-off-by: Laurent Vivier <lvivier@redhat.com>
---

Notes:
    Based-on: <20221109130301.790679-1-lvivier@redhat.com>

 net/stream.c                |  53 ++++++++++++++++++-
 qapi/net.json               |   6 ++-
 qemu-options.hx             |   6 +--
 tests/qtest/netdev-socket.c | 100 ++++++++++++++++++++++++++++++++++++
 4 files changed, 160 insertions(+), 5 deletions(-)

Comments

Markus Armbruster Nov. 10, 2022, 11:22 a.m. UTC | #1
Laurent Vivier <lvivier@redhat.com> writes:

> In stream mode, if the server shuts down there is currently
> no way to reconnect the client to a new server without removing
> the NIC device and the netdev backend (or to reboot).
>
> This patch introduces a reconnect option that specifies a delay
> to try to reconnect with the same parameters.
>
> Add a new test in qtest to test the reconnect option and the
> connect/disconnect events.
>
> Signed-off-by: Laurent Vivier <lvivier@redhat.com>
> ---

[...]

> diff --git a/qapi/net.json b/qapi/net.json
> index 522ac582edeb..5b72c936b3ac 100644
> --- a/qapi/net.json
> +++ b/qapi/net.json
> @@ -585,6 +585,9 @@
>  # @addr: socket address to listen on (server=true)
>  #        or connect to (server=false)
>  # @server: create server socket (default: false)
> +# @reconnect: For a client socket, if a socket is disconnected,
> +#             then attempt a reconnect after the given number of seconds.
> +#             Setting this to zero disables this function. (default: 0)

Double-checking: it attempts to reconnect *once*.  Correct?

Are we sure multiples of seconds will be fine?

"For a client socket": what happens for a server socket?

>  #
>  # Only SocketAddress types 'unix', 'inet' and 'fd' are supported.
>  #
> @@ -593,7 +596,8 @@
>  { 'struct': 'NetdevStreamOptions',
>    'data': {
>      'addr':   'SocketAddress',
> -    '*server': 'bool' } }
> +    '*server': 'bool',
> +    '*reconnect': 'uint32' } }
>  
>  ##
>  # @NetdevDgramOptions:

[...]
Daniel P. Berrangé Nov. 10, 2022, 11:37 a.m. UTC | #2
On Thu, Nov 10, 2022 at 12:22:21PM +0100, Markus Armbruster wrote:
> Laurent Vivier <lvivier@redhat.com> writes:
> 
> > In stream mode, if the server shuts down there is currently
> > no way to reconnect the client to a new server without removing
> > the NIC device and the netdev backend (or to reboot).
> >
> > This patch introduces a reconnect option that specifies a delay
> > to try to reconnect with the same parameters.
> >
> > Add a new test in qtest to test the reconnect option and the
> > connect/disconnect events.
> >
> > Signed-off-by: Laurent Vivier <lvivier@redhat.com>
> > ---
> 
> [...]
> 
> > diff --git a/qapi/net.json b/qapi/net.json
> > index 522ac582edeb..5b72c936b3ac 100644
> > --- a/qapi/net.json
> > +++ b/qapi/net.json
> > @@ -585,6 +585,9 @@
> >  # @addr: socket address to listen on (server=true)
> >  #        or connect to (server=false)
> >  # @server: create server socket (default: false)
> > +# @reconnect: For a client socket, if a socket is disconnected,
> > +#             then attempt a reconnect after the given number of seconds.
> > +#             Setting this to zero disables this function. (default: 0)
> 
> Double-checking: it attempts to reconnect *once*.  Correct?

Repeatedly.

IIUC, this doesn't just try to reconnect a failed connection, it'll
reconnect the initial connection if the server doesn't listen when
QEMU first startup.

> Are we sure multiples of seconds will be fine?

FWIW, this text & behaviour (AFAICT) is identical to the same named
option against ChardevSocket, which is good for consistency.

Since the reconnect is continuous until connected, you don't
want too frequent reconnects otherwise it'll burn CPU time.

That said, if it wasn't for the pre-existing chardev option,
I would have suggested milliseconds be worth considering as
units. Arguably it should probaby have an increasing backoff.

We could achieve that without changing semantics in a way that
would matter to apps, by declaring that 'reconnect' is the
maximum reconnect time. So start a reconnect at 50ms, 100ms,
200ms, 500ms, 1s, .... $reconnect

IOW, I think its okey to set reconnect units to seconds, and
we have the option to retrofit the fast start + backwoff
later.

> "For a client socket": what happens for a server socket?


With regards,
Daniel
diff mbox series

Patch

diff --git a/net/stream.c b/net/stream.c
index 53b7040cc417..c1523e8190cc 100644
--- a/net/stream.c
+++ b/net/stream.c
@@ -39,6 +39,8 @@ 
 #include "io/channel-socket.h"
 #include "io/net-listener.h"
 #include "qapi/qapi-events-net.h"
+#include "qapi/qapi-visit-sockets.h"
+#include "qapi/clone-visitor.h"
 
 typedef struct NetStreamState {
     NetClientState nc;
@@ -49,11 +51,15 @@  typedef struct NetStreamState {
     guint ioc_write_tag;
     SocketReadState rs;
     unsigned int send_index;      /* number of bytes sent*/
+    uint32_t reconnect;
+    guint timer_tag;
+    SocketAddress *addr;
 } NetStreamState;
 
 static void net_stream_listen(QIONetListener *listener,
                               QIOChannelSocket *cioc,
                               void *opaque);
+static void net_stream_arm_reconnect(NetStreamState *s);
 
 static gboolean net_stream_writable(QIOChannel *ioc,
                                     GIOCondition condition,
@@ -170,6 +176,7 @@  static gboolean net_stream_send(QIOChannel *ioc,
         qemu_set_info_str(&s->nc, "");
 
         qapi_event_send_netdev_stream_disconnected(s->nc.name);
+        net_stream_arm_reconnect(s);
 
         return G_SOURCE_REMOVE;
     }
@@ -187,6 +194,14 @@  static gboolean net_stream_send(QIOChannel *ioc,
 static void net_stream_cleanup(NetClientState *nc)
 {
     NetStreamState *s = DO_UPCAST(NetStreamState, nc, nc);
+    if (s->timer_tag) {
+        g_source_remove(s->timer_tag);
+        s->timer_tag = 0;
+    }
+    if (s->addr) {
+        qapi_free_SocketAddress(s->addr);
+        s->addr = NULL;
+    }
     if (s->ioc) {
         if (QIO_CHANNEL_SOCKET(s->ioc)->fd != -1) {
             if (s->ioc_read_tag) {
@@ -346,12 +361,37 @@  static void net_stream_client_connected(QIOTask *task, gpointer opaque)
 error:
     object_unref(OBJECT(s->ioc));
     s->ioc = NULL;
+    net_stream_arm_reconnect(s);
+}
+
+static gboolean net_stream_reconnect(gpointer data)
+{
+    NetStreamState *s = data;
+    QIOChannelSocket *sioc;
+
+    s->timer_tag = 0;
+
+    sioc = qio_channel_socket_new();
+    s->ioc = QIO_CHANNEL(sioc);
+    qio_channel_socket_connect_async(sioc, s->addr,
+                                     net_stream_client_connected, s,
+                                     NULL, NULL);
+    return G_SOURCE_REMOVE;
+}
+
+static void net_stream_arm_reconnect(NetStreamState *s)
+{
+    if (s->reconnect && s->timer_tag == 0) {
+        s->timer_tag = g_timeout_add_seconds(s->reconnect,
+                                             net_stream_reconnect, s);
+    }
 }
 
 static int net_stream_client_init(NetClientState *peer,
                                   const char *model,
                                   const char *name,
                                   SocketAddress *addr,
+                                  uint32_t reconnect,
                                   Error **errp)
 {
     NetStreamState *s;
@@ -364,6 +404,10 @@  static int net_stream_client_init(NetClientState *peer,
     s->ioc = QIO_CHANNEL(sioc);
     s->nc.link_down = true;
 
+    s->reconnect = reconnect;
+    if (reconnect) {
+        s->addr = QAPI_CLONE(SocketAddress, addr);
+    }
     qio_channel_socket_connect_async(sioc, addr,
                                      net_stream_client_connected, s,
                                      NULL, NULL);
@@ -380,7 +424,14 @@  int net_init_stream(const Netdev *netdev, const char *name,
     sock = &netdev->u.stream;
 
     if (!sock->has_server || !sock->server) {
-        return net_stream_client_init(peer, "stream", name, sock->addr, errp);
+        return net_stream_client_init(peer, "stream", name, sock->addr,
+                                      sock->has_reconnect ? sock->reconnect : 0,
+                                      errp);
+    }
+    if (sock->has_reconnect) {
+        error_setg(errp, "'reconnect' option is incompatible with "
+                         "socket in server mode");
+        return -1;
     }
     return net_stream_server_init(peer, "stream", name, sock->addr, errp);
 }
diff --git a/qapi/net.json b/qapi/net.json
index 522ac582edeb..5b72c936b3ac 100644
--- a/qapi/net.json
+++ b/qapi/net.json
@@ -585,6 +585,9 @@ 
 # @addr: socket address to listen on (server=true)
 #        or connect to (server=false)
 # @server: create server socket (default: false)
+# @reconnect: For a client socket, if a socket is disconnected,
+#             then attempt a reconnect after the given number of seconds.
+#             Setting this to zero disables this function. (default: 0)
 #
 # Only SocketAddress types 'unix', 'inet' and 'fd' are supported.
 #
@@ -593,7 +596,8 @@ 
 { 'struct': 'NetdevStreamOptions',
   'data': {
     'addr':   'SocketAddress',
-    '*server': 'bool' } }
+    '*server': 'bool',
+    '*reconnect': 'uint32' } }
 
 ##
 # @NetdevDgramOptions:
diff --git a/qemu-options.hx b/qemu-options.hx
index 8b8a4a5d016f..528ab1ffb5e2 100644
--- a/qemu-options.hx
+++ b/qemu-options.hx
@@ -2766,9 +2766,9 @@  DEF("netdev", HAS_ARG, QEMU_OPTION_netdev,
     "-netdev socket,id=str[,fd=h][,udp=host:port][,localaddr=host:port]\n"
     "                configure a network backend to connect to another network\n"
     "                using an UDP tunnel\n"
-    "-netdev stream,id=str[,server=on|off],addr.type=inet,addr.host=host,addr.port=port[,to=maxport][,numeric=on|off][,keep-alive=on|off][,mptcp=on|off][,addr.ipv4=on|off][,addr.ipv6=on|off]\n"
-    "-netdev stream,id=str[,server=on|off],addr.type=unix,addr.path=path[,abstract=on|off][,tight=on|off]\n"
-    "-netdev stream,id=str[,server=on|off],addr.type=fd,addr.str=file-descriptor\n"
+    "-netdev stream,id=str[,server=on|off],addr.type=inet,addr.host=host,addr.port=port[,to=maxport][,numeric=on|off][,keep-alive=on|off][,mptcp=on|off][,addr.ipv4=on|off][,addr.ipv6=on|off][,reconnect=seconds]\n"
+    "-netdev stream,id=str[,server=on|off],addr.type=unix,addr.path=path[,abstract=on|off][,tight=on|off][,reconnect=seconds]\n"
+    "-netdev stream,id=str[,server=on|off],addr.type=fd,addr.str=file-descriptor[,reconnect=seconds]\n"
     "                configure a network backend to connect to another network\n"
     "                using a socket connection in stream mode.\n"
     "-netdev dgram,id=str,remote.type=inet,remote.host=maddr,remote.port=port[,local.type=inet,local.host=addr]\n"
diff --git a/tests/qtest/netdev-socket.c b/tests/qtest/netdev-socket.c
index b6b59244a282..3d22bad6415f 100644
--- a/tests/qtest/netdev-socket.c
+++ b/tests/qtest/netdev-socket.c
@@ -10,6 +10,10 @@ 
 #include <glib/gstdio.h>
 #include "../unit/socket-helpers.h"
 #include "libqtest.h"
+#include "qapi/qmp/qstring.h"
+#include "qemu/sockets.h"
+#include "qapi/qobject-input-visitor.h"
+#include "qapi/qapi-visit-sockets.h"
 
 #define CONNECTION_TIMEOUT    5
 
@@ -144,6 +148,101 @@  static void test_stream_inet_ipv4(void)
     qtest_quit(qts0);
 }
 
+static void wait_stream_connected(QTestState *qts, const char *id,
+                                  SocketAddress **addr)
+{
+    QDict *resp, *data;
+    QString *qstr;
+    QObject *obj;
+    Visitor *v = NULL;
+
+    resp = qtest_qmp_eventwait_ref(qts, "NETDEV_STREAM_CONNECTED");
+    g_assert_nonnull(resp);
+    data = qdict_get_qdict(resp, "data");
+    g_assert_nonnull(data);
+
+    qstr = qobject_to(QString, qdict_get(data, "netdev-id"));
+    g_assert_nonnull(data);
+
+    g_assert(!strcmp(qstring_get_str(qstr), id));
+
+    obj = qdict_get(data, "addr");
+
+    v = qobject_input_visitor_new(obj);
+    visit_type_SocketAddress(v, NULL, addr, NULL);
+    visit_free(v);
+    qobject_unref(resp);
+}
+
+static void wait_stream_disconnected(QTestState *qts, const char *id)
+{
+    QDict *resp, *data;
+    QString *qstr;
+
+    resp = qtest_qmp_eventwait_ref(qts, "NETDEV_STREAM_DISCONNECTED");
+    g_assert_nonnull(resp);
+    data = qdict_get_qdict(resp, "data");
+    g_assert_nonnull(data);
+
+    qstr = qobject_to(QString, qdict_get(data, "netdev-id"));
+    g_assert_nonnull(data);
+
+    g_assert(!strcmp(qstring_get_str(qstr), id));
+    qobject_unref(resp);
+}
+
+static void test_stream_inet_reconnect(void)
+{
+    QTestState *qts0, *qts1;
+    int port;
+    SocketAddress *addr;
+
+    port = inet_get_free_port(false);
+    qts0 = qtest_initf("-nodefaults -M none "
+                       "-netdev stream,id=st0,server=true,addr.type=inet,"
+                       "addr.ipv4=on,addr.ipv6=off,"
+                       "addr.host=127.0.0.1,addr.port=%d", port);
+
+    EXPECT_STATE(qts0, "st0: index=0,type=stream,\r\n", 0);
+
+    qts1 = qtest_initf("-nodefaults -M none "
+                       "-netdev stream,server=false,id=st0,addr.type=inet,"
+                       "addr.ipv4=on,addr.ipv6=off,reconnect=1,"
+                       "addr.host=127.0.0.1,addr.port=%d", port);
+
+    wait_stream_connected(qts0, "st0", &addr);
+    g_assert_cmpint(addr->type, ==, SOCKET_ADDRESS_TYPE_INET);
+    g_assert_cmpstr(addr->u.inet.host, ==, "127.0.0.1");
+    qapi_free_SocketAddress(addr);
+
+    /* kill server */
+    qtest_quit(qts0);
+
+    /* check client has been disconnected */
+    wait_stream_disconnected(qts1, "st0");
+
+    /* restart server */
+    qts0 = qtest_initf("-nodefaults -M none "
+                       "-netdev stream,id=st0,server=true,addr.type=inet,"
+                       "addr.ipv4=on,addr.ipv6=off,"
+                       "addr.host=127.0.0.1,addr.port=%d", port);
+
+    /* wait connection events*/
+    wait_stream_connected(qts0, "st0", &addr);
+    g_assert_cmpint(addr->type, ==, SOCKET_ADDRESS_TYPE_INET);
+    g_assert_cmpstr(addr->u.inet.host, ==, "127.0.0.1");
+    qapi_free_SocketAddress(addr);
+
+    wait_stream_connected(qts1, "st0", &addr);
+    g_assert_cmpint(addr->type, ==, SOCKET_ADDRESS_TYPE_INET);
+    g_assert_cmpstr(addr->u.inet.host, ==, "127.0.0.1");
+    g_assert_cmpint(atoi(addr->u.inet.port), ==, port);
+    qapi_free_SocketAddress(addr);
+
+    qtest_quit(qts1);
+    qtest_quit(qts0);
+}
+
 static void test_stream_inet_ipv6(void)
 {
     QTestState *qts0, *qts1;
@@ -411,6 +510,7 @@  int main(int argc, char **argv)
         qtest_add_func("/netdev/stream/inet/ipv4", test_stream_inet_ipv4);
         qtest_add_func("/netdev/dgram/inet", test_dgram_inet);
         qtest_add_func("/netdev/dgram/mcast", test_dgram_mcast);
+        qtest_add_func("/netdev/stream/inet/reconnect", test_stream_inet_reconnect);
     }
     if (has_ipv6) {
         qtest_add_func("/netdev/stream/inet/ipv6", test_stream_inet_ipv6);