Message ID | 20220721195620.123837-6-het.gala@nutanix.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | multifd: Multiple interface support on top of Multifd | expand |
In $SUBJECT s/multifd:/io:/ as this is not migration related. On Thu, Jul 21, 2022 at 07:56:18PM +0000, Het Gala wrote: > i) Binding of the socket to source ip address and port on the non-default > interface has been implemented for multi-FD connection, which was not > necessary earlier because the binding was on the default interface itself. > > ii) Created an end to end connection between all multi-FD source and > destination pairs. > > Suggested-by: Manish Mishra <manish.mishra@nutanix.com> > Signed-off-by: Het Gala <het.gala@nutanix.com> > --- > include/io/channel-socket.h | 44 ++++++++++++++++ > include/qemu/sockets.h | 6 ++- > io/channel-socket.c | 93 ++++++++++++++++++++++++++-------- > migration/socket.c | 4 +- > tests/unit/test-util-sockets.c | 16 +++--- > util/qemu-sockets.c | 90 +++++++++++++++++++++++--------- > 6 files changed, 196 insertions(+), 57 deletions(-) > > diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h > index 513c428fe4..8168866b06 100644 > --- a/include/io/channel-socket.h > +++ b/include/io/channel-socket.h > @@ -211,6 +211,50 @@ void qio_channel_socket_dgram_async(QIOChannelSocket *ioc, > GMainContext *context); > > > +/** > + * qio_channel_socket_connect_all_sync: This needs to be called qio_channel_socket_connect_full_sync to match the naming conventions in use in IO code. > + * @ioc: the socket channel object > + * @dst_addr: the destination address to connect to > + * @src_addr: the source address to be connected 'the optional source address to bind to' > + * @errp: pointer to a NULL-initialized error object > + * > + * Attempt to connect to the address @dst_addr with @src_addr. * Attempt to connect to the address @dst_addr. If @src_addr * is non-NULL, it will be bound to in order to control outbound * interface routing. > + * This method will run in the foreground so the caller will not > + * regain execution control until the connection is established or > + * an error occurs. > + */ > +int qio_channel_socket_connect_all_sync(QIOChannelSocket *ioc, > + SocketAddress *dst_addr, > + SocketAddress *src_addr, > + Error **errp); Vertical mis-alignment of parameters > + > +/** > + * qio_channel_socket_connect_all_async: Needs to be qio_channel_socket_connect_full_async > + * @ioc: the socket channel object > + * @dst_addr: the destination address to connect to @src_addr needs to be placed here... > + * @callback: the function to invoke on completion > + * @opaque: user data to pass to @callback > + * @destroy: the function to free @opaque > + * @context: the context to run the async task. If %NULL, the default > + * context will be used. > + * @src_addr: the source address to be connected ...not here and same note about the docs comment > + * > + * Attempt to connect to the address @dst_addr with the @src_addr. Same note about the docs comment > + * This method will run in the background so the caller will regain > + * execution control immediately. The function @callback > + * will be invoked on completion or failure. The @dst_addr > + * parameter will be copied, so may be freed as soon > + * as this function returns without waiting for completion. > + */ > +void qio_channel_socket_connect_all_async(QIOChannelSocket *ioc, > + SocketAddress *dst_addr, > + QIOTaskFunc callback, > + gpointer opaque, > + GDestroyNotify destroy, > + GMainContext *context, > + SocketAddress *src_addr); > + > + > /** > * qio_channel_socket_get_local_address: > * @ioc: the socket channel object > diff --git a/migration/socket.c b/migration/socket.c > index dab872a0fe..69fda774ba 100644 > --- a/migration/socket.c > +++ b/migration/socket.c > @@ -57,8 +57,8 @@ int outgoing_param_total_multifds(void) > void socket_send_channel_create(QIOTaskFunc f, void *data) > { > QIOChannelSocket *sioc = qio_channel_socket_new(); > - qio_channel_socket_connect_async(sioc, outgoing_args.saddr, > - f, data, NULL, NULL); > + qio_channel_socket_connect_all_async(sioc, outgoing_args.saddr, > + f, data, NULL, NULL, NULL); > } Don't change this call at all until the next patch which actually needs to pass a non-NULL parameter for src. > QIOChannel *socket_send_channel_create_sync(Error **errp) > diff --git a/tests/unit/test-util-sockets.c b/tests/unit/test-util-sockets.c > index 63909ccb2b..aa26630045 100644 > --- a/tests/unit/test-util-sockets.c > +++ b/tests/unit/test-util-sockets.c > @@ -89,7 +89,7 @@ static void test_socket_fd_pass_name_good(void) > addr.type = SOCKET_ADDRESS_TYPE_FD; > addr.u.fd.str = g_strdup(mon_fdname); > > - fd = socket_connect(&addr, &error_abort); > + fd = socket_connect(&addr, NULL, &error_abort); > g_assert_cmpint(fd, !=, -1); > g_assert_cmpint(fd, !=, mon_fd); > close(fd); > @@ -121,7 +121,7 @@ static void test_socket_fd_pass_name_bad(void) > addr.type = SOCKET_ADDRESS_TYPE_FD; > addr.u.fd.str = g_strdup(mon_fdname); > > - fd = socket_connect(&addr, &err); > + fd = socket_connect(&addr, NULL, &err); > g_assert_cmpint(fd, ==, -1); > error_free_or_abort(&err); > > @@ -148,7 +148,7 @@ static void test_socket_fd_pass_name_nomon(void) > addr.type = SOCKET_ADDRESS_TYPE_FD; > addr.u.fd.str = g_strdup("myfd"); > > - fd = socket_connect(&addr, &err); > + fd = socket_connect(&addr, NULL, &err); > g_assert_cmpint(fd, ==, -1); > error_free_or_abort(&err); > > @@ -172,7 +172,7 @@ static void test_socket_fd_pass_num_good(void) > addr.type = SOCKET_ADDRESS_TYPE_FD; > addr.u.fd.str = g_strdup_printf("%d", sfd); > > - fd = socket_connect(&addr, &error_abort); > + fd = socket_connect(&addr, NULL, &error_abort); > g_assert_cmpint(fd, ==, sfd); > > fd = socket_listen(&addr, 1, &error_abort); > @@ -194,7 +194,7 @@ static void test_socket_fd_pass_num_bad(void) > addr.type = SOCKET_ADDRESS_TYPE_FD; > addr.u.fd.str = g_strdup_printf("%d", sfd); > > - fd = socket_connect(&addr, &err); > + fd = socket_connect(&addr, NULL, &err); > g_assert_cmpint(fd, ==, -1); > error_free_or_abort(&err); > > @@ -217,7 +217,7 @@ static void test_socket_fd_pass_num_nocli(void) > addr.type = SOCKET_ADDRESS_TYPE_FD; > addr.u.fd.str = g_strdup_printf("%d", STDOUT_FILENO); > > - fd = socket_connect(&addr, &err); > + fd = socket_connect(&addr, NULL, &err); > g_assert_cmpint(fd, ==, -1); > error_free_or_abort(&err); > > @@ -246,10 +246,10 @@ static gpointer unix_client_thread_func(gpointer user_data) > > for (i = 0; i < ABSTRACT_SOCKET_VARIANTS; i++) { > if (row->expect_connect[i]) { > - fd = socket_connect(row->client[i], &error_abort); > + fd = socket_connect(row->client[i], NULL, &error_abort); > g_assert_cmpint(fd, >=, 0); > } else { > - fd = socket_connect(row->client[i], &err); > + fd = socket_connect(row->client[i], NULL, &err); > g_assert_cmpint(fd, ==, -1); > error_free_or_abort(&err); > } I'd expect something added to the test suite to exercise the new codepath. Obviously we'd be limted to dealing with 127.0.0.1, but it can at least run the code paths. > diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c > index 13b5b197f9..491e2f2bc9 100644 > --- a/util/qemu-sockets.c > +++ b/util/qemu-sockets.c > @@ -358,8 +358,10 @@ listen_ok: > ((rc) == -EINPROGRESS) > #endif > > -static int inet_connect_addr(const InetSocketAddress *saddr, > - struct addrinfo *addr, Error **errp) > +static int inet_connect_addr(const InetSocketAddress *dst_addr, > + const InetSocketAddress *src_addr, > + struct addrinfo *addr, struct addrinfo *saddr, > + Error **errp) > { > int sock, rc; > > @@ -371,8 +373,28 @@ static int inet_connect_addr(const InetSocketAddress *saddr, > } > socket_set_fast_reuse(sock); > > + /* to bind the socket if src_addr is available */ > + > + if (src_addr) { > + struct sockaddr_in servaddr; > + > + /* bind to a specific interface in the internet domain */ > + /* to make sure the sin_zero filed is cleared */ > + memset(&servaddr, 0, sizeof(servaddr)); > + > + servaddr.sin_family = AF_INET; > + servaddr.sin_addr.s_addr = inet_addr(src_addr->host); My feedback from the previous posting has been ignored. This code is broken for IPv6. Never call the IPv4-only APIs, getaddrinfo is the only way to get a 'struct sockaddr *' in a protocol portable manner. > + servaddr.sin_port = 0; > + > + if (bind(sock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) { > + error_setg_errno(errp, errno, "Failed to bind socket"); > + return -1; > + } > + } > + > /* connect to peer */ > do { > + Arbitrary whitespace change should be removed > rc = 0; > if (connect(sock, addr->ai_addr, addr->ai_addrlen) < 0) { > rc = -errno; > @@ -380,8 +402,14 @@ static int inet_connect_addr(const InetSocketAddress *saddr, > @@ -446,41 +474,55 @@ static struct addrinfo *inet_parse_connect_saddr(InetSocketAddress *saddr, > * > * Returns: -1 on error, file descriptor on success. > */ > -int inet_connect_saddr(InetSocketAddress *saddr, Error **errp) > +int inet_connect_saddr(InetSocketAddress *dst_addr, > + InetSocketAddress *src_addr, Error **errp) > { > Error *local_err = NULL; > - struct addrinfo *res, *e; > + struct addrinfo *res_d, *res_s, *e, *x; > int sock = -1; > > - res = inet_parse_connect_saddr(saddr, errp); > - if (!res) { > + res_d = inet_parse_connect_saddr(dst_addr, errp); > + if (!res_d) { > return -1; > } > > - for (e = res; e != NULL; e = e->ai_next) { > + if (src_addr) { > + res_s = inet_parse_connect_saddr(src_addr, errp); > + if (!res_s) { > + return -1; > + } > + } > + > + for (e = res_d; e != NULL; e = e->ai_next) { > error_free(local_err); > local_err = NULL; > > #ifdef HAVE_IPPROTO_MPTCP > - if (saddr->has_mptcp && saddr->mptcp) { > + if (dst_addr->has_mptcp && dst_addr->mptcp) { > e->ai_protocol = IPPROTO_MPTCP; > } > #endif > + for (x = res_s; x != NULL; x = x->ai_next) { > + if (!src_addr || e->ai_family == x->ai_family) { > > - sock = inet_connect_addr(saddr, e, &local_err); > - if (sock >= 0) { > - break; > + sock = inet_connect_addr(dst_addr, src_addr, > + e, x, &local_err); > + if (sock >= 0) { > + break; > + } > + } > } > } If the ai_family for the src is different from ai_family for the dst, this loop will never call inet_connect_addr at all, and leave local_err unset, and so the error_propagate call below will have no error message to propagate. > > - freeaddrinfo(res); > + freeaddrinfo(res_d); > + freeaddrinfo(res_s); > > if (sock < 0) { > error_propagate(errp, local_err); > return sock; > } > > - if (saddr->keep_alive) { > + if (dst_addr->keep_alive) { > int val = 1; > int ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, > &val, sizeof(val)); > @@ -506,7 +548,7 @@ static int inet_dgram_saddr(InetSocketAddress *sraddr, > Error *err = NULL; > > /* lookup peer addr */ > - memset(&ai,0, sizeof(ai)); > + memset(&ai,0,sizeof(ai)); Unrelated whitespace change. > ai.ai_flags = AI_CANONNAME | AI_V4MAPPED | AI_ADDRCONFIG; > ai.ai_family = inet_ai_family_from_address(sraddr, &err); > ai.ai_socktype = SOCK_DGRAM; > @@ -727,7 +769,7 @@ int inet_connect(const char *str, Error **errp) > InetSocketAddress *addr = g_new(InetSocketAddress, 1); > > if (!inet_parse(addr, str, errp)) { > - sock = inet_connect_saddr(addr, errp); > + sock = inet_connect_saddr(addr, NULL, errp); > } > qapi_free_InetSocketAddress(addr); > return sock; With regards, Daniel
On 26/07/22 4:14 pm, Daniel P. Berrangé wrote: > In $SUBJECT s/multifd:/io:/ as this is not migration related. > > On Thu, Jul 21, 2022 at 07:56:18PM +0000, Het Gala wrote: >> i) Binding of the socket to source ip address and port on the non-default >> interface has been implemented for multi-FD connection, which was not >> necessary earlier because the binding was on the default interface itself. >> >> ii) Created an end to end connection between all multi-FD source and >> destination pairs. >> >> Suggested-by: Manish Mishra <manish.mishra@nutanix.com> >> Signed-off-by: Het Gala <het.gala@nutanix.com> >> --- >> include/io/channel-socket.h | 44 ++++++++++++++++ >> include/qemu/sockets.h | 6 ++- >> io/channel-socket.c | 93 ++++++++++++++++++++++++++-------- >> migration/socket.c | 4 +- >> tests/unit/test-util-sockets.c | 16 +++--- >> util/qemu-sockets.c | 90 +++++++++++++++++++++++--------- >> 6 files changed, 196 insertions(+), 57 deletions(-) >> >> diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h >> index 513c428fe4..8168866b06 100644 >> --- a/include/io/channel-socket.h >> +++ b/include/io/channel-socket.h >> @@ -211,6 +211,50 @@ void qio_channel_socket_dgram_async(QIOChannelSocket *ioc, >> GMainContext *context); >> >> >> +/** >> + * qio_channel_socket_connect_all_sync: > This needs to be called qio_channel_socket_connect_full_sync to > match the naming conventions in use in IO code. > Sorry Daniel, will definitely update this in the coming patchset for sure. >> + * @ioc: the socket channel object >> + * @dst_addr: the destination address to connect to >> + * @src_addr: the source address to be connected > 'the optional source address to bind to' > Sure, acknowledged. >> + * @errp: pointer to a NULL-initialized error object >> + * >> + * Attempt to connect to the address @dst_addr with @src_addr. > * Attempt to connect to the address @dst_addr. If @src_addr > * is non-NULL, it will be bound to in order to control outbound > * interface routing. > > >> + * This method will run in the foreground so the caller will not >> + * regain execution control until the connection is established or >> + * an error occurs. >> + */ >> +int qio_channel_socket_connect_all_sync(QIOChannelSocket *ioc, >> + SocketAddress *dst_addr, >> + SocketAddress *src_addr, >> + Error **errp); > Vertical mis-alignment of parameters > Acknowledged. >> + >> +/** >> + * qio_channel_socket_connect_all_async: > Needs to be qio_channel_socket_connect_full_async > Acknowledged. Sorry for such nit errors. Will update them in next patchset >> + * @ioc: the socket channel object >> + * @dst_addr: the destination address to connect to > @src_addr needs to be placed here... > Acknowledged. >> + * @callback: the function to invoke on completion >> + * @opaque: user data to pass to @callback >> + * @destroy: the function to free @opaque >> + * @context: the context to run the async task. If %NULL, the default >> + * context will be used. >> + * @src_addr: the source address to be connected > ...not here > > and same note about the docs comment > Acknowledged >> + * >> + * Attempt to connect to the address @dst_addr with the @src_addr. > Same note about the docs comment > Acknowledged. > >> + * This method will run in the background so the caller will regain >> + * execution control immediately. The function @callback >> + * will be invoked on completion or failure. The @dst_addr >> + * parameter will be copied, so may be freed as soon >> + * as this function returns without waiting for completion. >> + */ >> +void qio_channel_socket_connect_all_async(QIOChannelSocket *ioc, >> + SocketAddress *dst_addr, >> + QIOTaskFunc callback, >> + gpointer opaque, >> + GDestroyNotify destroy, >> + GMainContext *context, >> + SocketAddress *src_addr); >> + >> + >> /** >> * qio_channel_socket_get_local_address: >> * @ioc: the socket channel object >> >> >> >> >> >> diff --git a/migration/socket.c b/migration/socket.c >> index dab872a0fe..69fda774ba 100644 >> --- a/migration/socket.c >> +++ b/migration/socket.c >> @@ -57,8 +57,8 @@ int outgoing_param_total_multifds(void) >> void socket_send_channel_create(QIOTaskFunc f, void *data) >> { >> QIOChannelSocket *sioc = qio_channel_socket_new(); >> - qio_channel_socket_connect_async(sioc, outgoing_args.saddr, >> - f, data, NULL, NULL); >> + qio_channel_socket_connect_all_async(sioc, outgoing_args.saddr, >> + f, data, NULL, NULL, NULL); >> } > Don't change this call at all until the next patch which actually > needs to pass a non-NULL parameter for src. > Sure, acknowledged. >> QIOChannel *socket_send_channel_create_sync(Error **errp) >> diff --git a/tests/unit/test-util-sockets.c b/tests/unit/test-util-sockets.c >> index 63909ccb2b..aa26630045 100644 >> --- a/tests/unit/test-util-sockets.c >> +++ b/tests/unit/test-util-sockets.c >> @@ -89,7 +89,7 @@ static void test_socket_fd_pass_name_good(void) >> addr.type = SOCKET_ADDRESS_TYPE_FD; >> addr.u.fd.str = g_strdup(mon_fdname); >> >> - fd = socket_connect(&addr, &error_abort); >> + fd = socket_connect(&addr, NULL, &error_abort); >> g_assert_cmpint(fd, !=, -1); >> g_assert_cmpint(fd, !=, mon_fd); >> close(fd); >> @@ -121,7 +121,7 @@ static void test_socket_fd_pass_name_bad(void) >> addr.type = SOCKET_ADDRESS_TYPE_FD; >> addr.u.fd.str = g_strdup(mon_fdname); >> >> - fd = socket_connect(&addr, &err); >> + fd = socket_connect(&addr, NULL, &err); >> g_assert_cmpint(fd, ==, -1); >> error_free_or_abort(&err); >> >> @@ -148,7 +148,7 @@ static void test_socket_fd_pass_name_nomon(void) >> addr.type = SOCKET_ADDRESS_TYPE_FD; >> addr.u.fd.str = g_strdup("myfd"); >> >> - fd = socket_connect(&addr, &err); >> + fd = socket_connect(&addr, NULL, &err); >> g_assert_cmpint(fd, ==, -1); >> error_free_or_abort(&err); >> >> @@ -172,7 +172,7 @@ static void test_socket_fd_pass_num_good(void) >> addr.type = SOCKET_ADDRESS_TYPE_FD; >> addr.u.fd.str = g_strdup_printf("%d", sfd); >> >> - fd = socket_connect(&addr, &error_abort); >> + fd = socket_connect(&addr, NULL, &error_abort); >> g_assert_cmpint(fd, ==, sfd); >> >> fd = socket_listen(&addr, 1, &error_abort); >> @@ -194,7 +194,7 @@ static void test_socket_fd_pass_num_bad(void) >> addr.type = SOCKET_ADDRESS_TYPE_FD; >> addr.u.fd.str = g_strdup_printf("%d", sfd); >> >> - fd = socket_connect(&addr, &err); >> + fd = socket_connect(&addr, NULL, &err); >> g_assert_cmpint(fd, ==, -1); >> error_free_or_abort(&err); >> >> @@ -217,7 +217,7 @@ static void test_socket_fd_pass_num_nocli(void) >> addr.type = SOCKET_ADDRESS_TYPE_FD; >> addr.u.fd.str = g_strdup_printf("%d", STDOUT_FILENO); >> >> - fd = socket_connect(&addr, &err); >> + fd = socket_connect(&addr, NULL, &err); >> g_assert_cmpint(fd, ==, -1); >> error_free_or_abort(&err); >> >> @@ -246,10 +246,10 @@ static gpointer unix_client_thread_func(gpointer user_data) >> >> for (i = 0; i < ABSTRACT_SOCKET_VARIANTS; i++) { >> if (row->expect_connect[i]) { >> - fd = socket_connect(row->client[i], &error_abort); >> + fd = socket_connect(row->client[i], NULL, &error_abort); >> g_assert_cmpint(fd, >=, 0); >> } else { >> - fd = socket_connect(row->client[i], &err); >> + fd = socket_connect(row->client[i], NULL, &err); >> g_assert_cmpint(fd, ==, -1); >> error_free_or_abort(&err); >> } > I'd expect something added to the test suite to exercise the new > codepath. Obviously we'd be limted to dealing with 127.0.0.1, > but it can at least run the code paths. > Sure Daniel. Will add some test cases from the coming v3 patchset series. >> diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c >> index 13b5b197f9..491e2f2bc9 100644 >> --- a/util/qemu-sockets.c >> +++ b/util/qemu-sockets.c >> @@ -358,8 +358,10 @@ listen_ok: >> ((rc) == -EINPROGRESS) >> #endif >> >> -static int inet_connect_addr(const InetSocketAddress *saddr, >> - struct addrinfo *addr, Error **errp) >> +static int inet_connect_addr(const InetSocketAddress *dst_addr, >> + const InetSocketAddress *src_addr, >> + struct addrinfo *addr, struct addrinfo *saddr, >> + Error **errp) >> { >> int sock, rc; >> >> @@ -371,8 +373,28 @@ static int inet_connect_addr(const InetSocketAddress *saddr, >> } >> socket_set_fast_reuse(sock); >> >> + /* to bind the socket if src_addr is available */ >> + >> + if (src_addr) { >> + struct sockaddr_in servaddr; >> + >> + /* bind to a specific interface in the internet domain */ >> + /* to make sure the sin_zero filed is cleared */ >> + memset(&servaddr, 0, sizeof(servaddr)); >> + >> + servaddr.sin_family = AF_INET; >> + servaddr.sin_addr.s_addr = inet_addr(src_addr->host); > My feedback from the previous posting has been ignored. This code is > broken for IPv6. Never call the IPv4-only APIs, getaddrinfo is the > only way to get a 'struct sockaddr *' in a protocol portable manner. > Sorry Daniel, my appologies. I certainly misunderstood your point in the previous patchset. I thought this post was in sync with the ai_family check inet_connect_saddr function, and we wanted the src_addr to be called for getaddrinfo function in that context. But, I get your point now. I will surely update this in the v3 patchset series. >> + servaddr.sin_port = 0; >> + >> + if (bind(sock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) { >> + error_setg_errno(errp, errno, "Failed to bind socket"); >> + return -1; >> + } >> + } >> + >> /* connect to peer */ >> do { >> + > Arbitrary whitespace change should be removed > >> rc = 0; >> if (connect(sock, addr->ai_addr, addr->ai_addrlen) < 0) { >> rc = -errno; >> @@ -380,8 +402,14 @@ static int inet_connect_addr(const InetSocketAddress *saddr, >> @@ -446,41 +474,55 @@ static struct addrinfo *inet_parse_connect_saddr(InetSocketAddress *saddr, >> * >> * Returns: -1 on error, file descriptor on success. >> */ >> -int inet_connect_saddr(InetSocketAddress *saddr, Error **errp) >> +int inet_connect_saddr(InetSocketAddress *dst_addr, >> + InetSocketAddress *src_addr, Error **errp) >> { >> Error *local_err = NULL; >> - struct addrinfo *res, *e; >> + struct addrinfo *res_d, *res_s, *e, *x; >> int sock = -1; >> >> - res = inet_parse_connect_saddr(saddr, errp); >> - if (!res) { >> + res_d = inet_parse_connect_saddr(dst_addr, errp); >> + if (!res_d) { >> return -1; >> } >> >> - for (e = res; e != NULL; e = e->ai_next) { >> + if (src_addr) { >> + res_s = inet_parse_connect_saddr(src_addr, errp); >> + if (!res_s) { >> + return -1; >> + } >> + } >> + >> + for (e = res_d; e != NULL; e = e->ai_next) { >> error_free(local_err); >> local_err = NULL; >> >> #ifdef HAVE_IPPROTO_MPTCP >> - if (saddr->has_mptcp && saddr->mptcp) { >> + if (dst_addr->has_mptcp && dst_addr->mptcp) { >> e->ai_protocol = IPPROTO_MPTCP; >> } >> #endif >> + for (x = res_s; x != NULL; x = x->ai_next) { >> + if (!src_addr || e->ai_family == x->ai_family) { >> >> - sock = inet_connect_addr(saddr, e, &local_err); >> - if (sock >= 0) { >> - break; >> + sock = inet_connect_addr(dst_addr, src_addr, >> + e, x, &local_err); >> + if (sock >= 0) { >> + break; >> + } >> + } >> } >> } > If the ai_family for the src is different from ai_family for > the dst, this loop will never call inet_connect_addr at all, > and leave local_err unset, and so the error_propagate call > below will have no error message to propagate. > Yes, you are right, so we need to check and have a error placed here, in-case if it never calls inet_connect_addr func, then we should print an error statement there right. Thankyou Daniel for pointing this out. >> >> - freeaddrinfo(res); >> + freeaddrinfo(res_d); >> + freeaddrinfo(res_s); >> >> if (sock < 0) { >> error_propagate(errp, local_err); >> return sock; >> } >> >> - if (saddr->keep_alive) { >> + if (dst_addr->keep_alive) { >> int val = 1; >> int ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, >> &val, sizeof(val)); >> @@ -506,7 +548,7 @@ static int inet_dgram_saddr(InetSocketAddress *sraddr, >> Error *err = NULL; >> >> /* lookup peer addr */ >> - memset(&ai,0, sizeof(ai)); >> + memset(&ai,0,sizeof(ai)); > Unrelated whitespace change. > Acknowledged. >> ai.ai_flags = AI_CANONNAME | AI_V4MAPPED | AI_ADDRCONFIG; >> ai.ai_family = inet_ai_family_from_address(sraddr, &err); >> ai.ai_socktype = SOCK_DGRAM; >> @@ -727,7 +769,7 @@ int inet_connect(const char *str, Error **errp) >> InetSocketAddress *addr = g_new(InetSocketAddress, 1); >> >> if (!inet_parse(addr, str, errp)) { >> - sock = inet_connect_saddr(addr, errp); >> + sock = inet_connect_saddr(addr, NULL, errp); >> } >> qapi_free_InetSocketAddress(addr); >> return sock; > With regards, > Daniel With regards, Het Gala
diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h index 513c428fe4..8168866b06 100644 --- a/include/io/channel-socket.h +++ b/include/io/channel-socket.h @@ -211,6 +211,50 @@ void qio_channel_socket_dgram_async(QIOChannelSocket *ioc, GMainContext *context); +/** + * qio_channel_socket_connect_all_sync: + * @ioc: the socket channel object + * @dst_addr: the destination address to connect to + * @src_addr: the source address to be connected + * @errp: pointer to a NULL-initialized error object + * + * Attempt to connect to the address @dst_addr with @src_addr. + * This method will run in the foreground so the caller will not + * regain execution control until the connection is established or + * an error occurs. + */ +int qio_channel_socket_connect_all_sync(QIOChannelSocket *ioc, + SocketAddress *dst_addr, + SocketAddress *src_addr, + Error **errp); + +/** + * qio_channel_socket_connect_all_async: + * @ioc: the socket channel object + * @dst_addr: the destination address to connect to + * @callback: the function to invoke on completion + * @opaque: user data to pass to @callback + * @destroy: the function to free @opaque + * @context: the context to run the async task. If %NULL, the default + * context will be used. + * @src_addr: the source address to be connected + * + * Attempt to connect to the address @dst_addr with the @src_addr. + * This method will run in the background so the caller will regain + * execution control immediately. The function @callback + * will be invoked on completion or failure. The @dst_addr + * parameter will be copied, so may be freed as soon + * as this function returns without waiting for completion. + */ +void qio_channel_socket_connect_all_async(QIOChannelSocket *ioc, + SocketAddress *dst_addr, + QIOTaskFunc callback, + gpointer opaque, + GDestroyNotify destroy, + GMainContext *context, + SocketAddress *src_addr); + + /** * qio_channel_socket_get_local_address: * @ioc: the socket channel object diff --git a/include/qemu/sockets.h b/include/qemu/sockets.h index 038faa157f..dc863c3df8 100644 --- a/include/qemu/sockets.h +++ b/include/qemu/sockets.h @@ -33,7 +33,8 @@ int inet_ai_family_from_address(InetSocketAddress *addr, Error **errp); int inet_parse(InetSocketAddress *addr, const char *str, Error **errp); int inet_connect(const char *str, Error **errp); -int inet_connect_saddr(InetSocketAddress *saddr, Error **errp); +int inet_connect_saddr(InetSocketAddress *dst_addr, + InetSocketAddress *src_addr, Error **errp); NetworkAddressFamily inet_netfamily(int family); @@ -41,7 +42,8 @@ int unix_listen(const char *path, Error **errp); int unix_connect(const char *path, Error **errp); SocketAddress *socket_parse(const char *str, Error **errp); -int socket_connect(SocketAddress *addr, Error **errp); +int socket_connect(SocketAddress *dst_addr, SocketAddress *src_addr, + Error **errp); int socket_listen(SocketAddress *addr, int num, Error **errp); void socket_listen_cleanup(int fd, Error **errp); int socket_dgram(SocketAddress *remote, SocketAddress *local, Error **errp); diff --git a/io/channel-socket.c b/io/channel-socket.c index 74a936cc1f..142298469b 100644 --- a/io/channel-socket.c +++ b/io/channel-socket.c @@ -144,14 +144,15 @@ qio_channel_socket_new_fd(int fd, } -int qio_channel_socket_connect_sync(QIOChannelSocket *ioc, - SocketAddress *addr, - Error **errp) +int qio_channel_socket_connect_all_sync(QIOChannelSocket *ioc, + SocketAddress *dst_addr, + SocketAddress *src_addr, + Error **errp) { int fd; - trace_qio_channel_socket_connect_sync(ioc, addr); - fd = socket_connect(addr, errp); + trace_qio_channel_socket_connect_sync(ioc, dst_addr); + fd = socket_connect(dst_addr, src_addr, errp); if (fd < 0) { trace_qio_channel_socket_connect_fail(ioc); return -1; @@ -177,19 +178,78 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc, } -static void qio_channel_socket_connect_worker(QIOTask *task, - gpointer opaque) +struct ConnectData { + SocketAddress *dst_addr; + SocketAddress *src_addr; +}; + + +static void qio_channel_socket_all_worker_free(gpointer opaque) +{ + struct ConnectData *data = opaque; + if (!data) { + return; + } + qapi_free_SocketAddress(data->dst_addr); + qapi_free_SocketAddress(data->src_addr); + g_free(data); +} + +static void qio_channel_socket_connect_all_worker(QIOTask *task, + gpointer opaque) { QIOChannelSocket *ioc = QIO_CHANNEL_SOCKET(qio_task_get_source(task)); - SocketAddress *addr = opaque; + struct ConnectData *data = opaque; Error *err = NULL; - qio_channel_socket_connect_sync(ioc, addr, &err); + qio_channel_socket_connect_all_sync(ioc, data->dst_addr, + data->src_addr, &err); qio_task_set_error(task, err); } +void qio_channel_socket_connect_all_async(QIOChannelSocket *ioc, + SocketAddress *dst_addr, + QIOTaskFunc callback, + gpointer opaque, + GDestroyNotify destroy, + GMainContext *context, + SocketAddress *src_addr) +{ + QIOTask *task = qio_task_new( + OBJECT(ioc), callback, opaque, destroy); + struct ConnectData *data = g_new0(struct ConnectData, 1); + + data->dst_addr = QAPI_CLONE(SocketAddress, dst_addr); + if (src_addr) { + data->src_addr = QAPI_CLONE(SocketAddress, src_addr); + } else { + data->src_addr = NULL; + } + /* + * socket_connect() does a non-blocking connect(), but it + * still blocks in DNS lookups, so we must use a thread + */ + trace_qio_channel_socket_connect_async(ioc, dst_addr); + qio_task_run_in_thread(task, + qio_channel_socket_connect_all_worker, + data, + qio_channel_socket_all_worker_free, + context); +} + + +int qio_channel_socket_connect_sync(QIOChannelSocket *ioc, + SocketAddress *addr, + Error **errp) +{ + qio_channel_socket_connect_all_sync(ioc, addr, NULL, errp); + + return 0; +} + + void qio_channel_socket_connect_async(QIOChannelSocket *ioc, SocketAddress *addr, QIOTaskFunc callback, @@ -197,20 +257,9 @@ void qio_channel_socket_connect_async(QIOChannelSocket *ioc, GDestroyNotify destroy, GMainContext *context) { - QIOTask *task = qio_task_new( - OBJECT(ioc), callback, opaque, destroy); - SocketAddress *addrCopy; - - addrCopy = QAPI_CLONE(SocketAddress, addr); + qio_channel_socket_connect_all_async(ioc, addr, callback, opaque, + destroy, context, NULL); - /* socket_connect() does a non-blocking connect(), but it - * still blocks in DNS lookups, so we must use a thread */ - trace_qio_channel_socket_connect_async(ioc, addr); - qio_task_run_in_thread(task, - qio_channel_socket_connect_worker, - addrCopy, - (GDestroyNotify)qapi_free_SocketAddress, - context); } diff --git a/migration/socket.c b/migration/socket.c index dab872a0fe..69fda774ba 100644 --- a/migration/socket.c +++ b/migration/socket.c @@ -57,8 +57,8 @@ int outgoing_param_total_multifds(void) void socket_send_channel_create(QIOTaskFunc f, void *data) { QIOChannelSocket *sioc = qio_channel_socket_new(); - qio_channel_socket_connect_async(sioc, outgoing_args.saddr, - f, data, NULL, NULL); + qio_channel_socket_connect_all_async(sioc, outgoing_args.saddr, + f, data, NULL, NULL, NULL); } QIOChannel *socket_send_channel_create_sync(Error **errp) diff --git a/tests/unit/test-util-sockets.c b/tests/unit/test-util-sockets.c index 63909ccb2b..aa26630045 100644 --- a/tests/unit/test-util-sockets.c +++ b/tests/unit/test-util-sockets.c @@ -89,7 +89,7 @@ static void test_socket_fd_pass_name_good(void) addr.type = SOCKET_ADDRESS_TYPE_FD; addr.u.fd.str = g_strdup(mon_fdname); - fd = socket_connect(&addr, &error_abort); + fd = socket_connect(&addr, NULL, &error_abort); g_assert_cmpint(fd, !=, -1); g_assert_cmpint(fd, !=, mon_fd); close(fd); @@ -121,7 +121,7 @@ static void test_socket_fd_pass_name_bad(void) addr.type = SOCKET_ADDRESS_TYPE_FD; addr.u.fd.str = g_strdup(mon_fdname); - fd = socket_connect(&addr, &err); + fd = socket_connect(&addr, NULL, &err); g_assert_cmpint(fd, ==, -1); error_free_or_abort(&err); @@ -148,7 +148,7 @@ static void test_socket_fd_pass_name_nomon(void) addr.type = SOCKET_ADDRESS_TYPE_FD; addr.u.fd.str = g_strdup("myfd"); - fd = socket_connect(&addr, &err); + fd = socket_connect(&addr, NULL, &err); g_assert_cmpint(fd, ==, -1); error_free_or_abort(&err); @@ -172,7 +172,7 @@ static void test_socket_fd_pass_num_good(void) addr.type = SOCKET_ADDRESS_TYPE_FD; addr.u.fd.str = g_strdup_printf("%d", sfd); - fd = socket_connect(&addr, &error_abort); + fd = socket_connect(&addr, NULL, &error_abort); g_assert_cmpint(fd, ==, sfd); fd = socket_listen(&addr, 1, &error_abort); @@ -194,7 +194,7 @@ static void test_socket_fd_pass_num_bad(void) addr.type = SOCKET_ADDRESS_TYPE_FD; addr.u.fd.str = g_strdup_printf("%d", sfd); - fd = socket_connect(&addr, &err); + fd = socket_connect(&addr, NULL, &err); g_assert_cmpint(fd, ==, -1); error_free_or_abort(&err); @@ -217,7 +217,7 @@ static void test_socket_fd_pass_num_nocli(void) addr.type = SOCKET_ADDRESS_TYPE_FD; addr.u.fd.str = g_strdup_printf("%d", STDOUT_FILENO); - fd = socket_connect(&addr, &err); + fd = socket_connect(&addr, NULL, &err); g_assert_cmpint(fd, ==, -1); error_free_or_abort(&err); @@ -246,10 +246,10 @@ static gpointer unix_client_thread_func(gpointer user_data) for (i = 0; i < ABSTRACT_SOCKET_VARIANTS; i++) { if (row->expect_connect[i]) { - fd = socket_connect(row->client[i], &error_abort); + fd = socket_connect(row->client[i], NULL, &error_abort); g_assert_cmpint(fd, >=, 0); } else { - fd = socket_connect(row->client[i], &err); + fd = socket_connect(row->client[i], NULL, &err); g_assert_cmpint(fd, ==, -1); error_free_or_abort(&err); } diff --git a/util/qemu-sockets.c b/util/qemu-sockets.c index 13b5b197f9..491e2f2bc9 100644 --- a/util/qemu-sockets.c +++ b/util/qemu-sockets.c @@ -358,8 +358,10 @@ listen_ok: ((rc) == -EINPROGRESS) #endif -static int inet_connect_addr(const InetSocketAddress *saddr, - struct addrinfo *addr, Error **errp) +static int inet_connect_addr(const InetSocketAddress *dst_addr, + const InetSocketAddress *src_addr, + struct addrinfo *addr, struct addrinfo *saddr, + Error **errp) { int sock, rc; @@ -371,8 +373,28 @@ static int inet_connect_addr(const InetSocketAddress *saddr, } socket_set_fast_reuse(sock); + /* to bind the socket if src_addr is available */ + + if (src_addr) { + struct sockaddr_in servaddr; + + /* bind to a specific interface in the internet domain */ + /* to make sure the sin_zero filed is cleared */ + memset(&servaddr, 0, sizeof(servaddr)); + + servaddr.sin_family = AF_INET; + servaddr.sin_addr.s_addr = inet_addr(src_addr->host); + servaddr.sin_port = 0; + + if (bind(sock, (struct sockaddr *)&servaddr, sizeof(servaddr)) < 0) { + error_setg_errno(errp, errno, "Failed to bind socket"); + return -1; + } + } + /* connect to peer */ do { + rc = 0; if (connect(sock, addr->ai_addr, addr->ai_addrlen) < 0) { rc = -errno; @@ -380,8 +402,14 @@ static int inet_connect_addr(const InetSocketAddress *saddr, } while (rc == -EINTR); if (rc < 0) { - error_setg_errno(errp, errno, "Failed to connect to '%s:%s'", - saddr->host, saddr->port); + if (src_addr) { + error_setg_errno(errp, errno, "Failed to connect '%s:%s' to " + "'%s:%s'", dst_addr->host, dst_addr->port, + src_addr->host, src_addr->port); + } else { + error_setg_errno(errp, errno, "Failed to connect '%s:%s'", + dst_addr->host, dst_addr->port); + } closesocket(sock); return -1; } @@ -446,41 +474,55 @@ static struct addrinfo *inet_parse_connect_saddr(InetSocketAddress *saddr, * * Returns: -1 on error, file descriptor on success. */ -int inet_connect_saddr(InetSocketAddress *saddr, Error **errp) +int inet_connect_saddr(InetSocketAddress *dst_addr, + InetSocketAddress *src_addr, Error **errp) { Error *local_err = NULL; - struct addrinfo *res, *e; + struct addrinfo *res_d, *res_s, *e, *x; int sock = -1; - res = inet_parse_connect_saddr(saddr, errp); - if (!res) { + res_d = inet_parse_connect_saddr(dst_addr, errp); + if (!res_d) { return -1; } - for (e = res; e != NULL; e = e->ai_next) { + if (src_addr) { + res_s = inet_parse_connect_saddr(src_addr, errp); + if (!res_s) { + return -1; + } + } + + for (e = res_d; e != NULL; e = e->ai_next) { error_free(local_err); local_err = NULL; #ifdef HAVE_IPPROTO_MPTCP - if (saddr->has_mptcp && saddr->mptcp) { + if (dst_addr->has_mptcp && dst_addr->mptcp) { e->ai_protocol = IPPROTO_MPTCP; } #endif + for (x = res_s; x != NULL; x = x->ai_next) { + if (!src_addr || e->ai_family == x->ai_family) { - sock = inet_connect_addr(saddr, e, &local_err); - if (sock >= 0) { - break; + sock = inet_connect_addr(dst_addr, src_addr, + e, x, &local_err); + if (sock >= 0) { + break; + } + } } } - freeaddrinfo(res); + freeaddrinfo(res_d); + freeaddrinfo(res_s); if (sock < 0) { error_propagate(errp, local_err); return sock; } - if (saddr->keep_alive) { + if (dst_addr->keep_alive) { int val = 1; int ret = setsockopt(sock, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)); @@ -506,7 +548,7 @@ static int inet_dgram_saddr(InetSocketAddress *sraddr, Error *err = NULL; /* lookup peer addr */ - memset(&ai,0, sizeof(ai)); + memset(&ai,0,sizeof(ai)); ai.ai_flags = AI_CANONNAME | AI_V4MAPPED | AI_ADDRCONFIG; ai.ai_family = inet_ai_family_from_address(sraddr, &err); ai.ai_socktype = SOCK_DGRAM; @@ -727,7 +769,7 @@ int inet_connect(const char *str, Error **errp) InetSocketAddress *addr = g_new(InetSocketAddress, 1); if (!inet_parse(addr, str, errp)) { - sock = inet_connect_saddr(addr, errp); + sock = inet_connect_saddr(addr, NULL, errp); } qapi_free_InetSocketAddress(addr); return sock; @@ -1182,25 +1224,27 @@ int socket_address_parse_named_fd(SocketAddress *addr, Error **errp) return 0; } -int socket_connect(SocketAddress *addr, Error **errp) +int socket_connect(SocketAddress *dst_addr, + SocketAddress *src_addr, Error **errp) { int fd; - switch (addr->type) { + switch (dst_addr->type) { case SOCKET_ADDRESS_TYPE_INET: - fd = inet_connect_saddr(&addr->u.inet, errp); + fd = inet_connect_saddr(&dst_addr->u.inet, src_addr ? + &src_addr->u.inet : NULL, errp); break; case SOCKET_ADDRESS_TYPE_UNIX: - fd = unix_connect_saddr(&addr->u.q_unix, errp); + fd = unix_connect_saddr(&dst_addr->u.q_unix, errp); break; case SOCKET_ADDRESS_TYPE_FD: - fd = socket_get_fd(addr->u.fd.str, errp); + fd = socket_get_fd(dst_addr->u.fd.str, errp); break; case SOCKET_ADDRESS_TYPE_VSOCK: - fd = vsock_connect_saddr(&addr->u.vsock, errp); + fd = vsock_connect_saddr(&dst_addr->u.vsock, errp); break; default:
i) Binding of the socket to source ip address and port on the non-default interface has been implemented for multi-FD connection, which was not necessary earlier because the binding was on the default interface itself. ii) Created an end to end connection between all multi-FD source and destination pairs. Suggested-by: Manish Mishra <manish.mishra@nutanix.com> Signed-off-by: Het Gala <het.gala@nutanix.com> --- include/io/channel-socket.h | 44 ++++++++++++++++ include/qemu/sockets.h | 6 ++- io/channel-socket.c | 93 ++++++++++++++++++++++++++-------- migration/socket.c | 4 +- tests/unit/test-util-sockets.c | 16 +++--- util/qemu-sockets.c | 90 +++++++++++++++++++++++--------- 6 files changed, 196 insertions(+), 57 deletions(-)