diff mbox series

[v4,2/3] QIOChannelSocket: Implement io_writev_zerocopy & io_flush_zerocopy for CONFIG_LINUX

Message ID 20211009075612.230283-3-leobras@redhat.com (mailing list archive)
State New, archived
Headers show
Series MSG_ZEROCOPY for multifd | expand

Commit Message

Leonardo Bras Oct. 9, 2021, 7:56 a.m. UTC
For CONFIG_LINUX, implement the new optional callbacks io_write_zerocopy and
io_flush_zerocopy on QIOChannelSocket, but enables it only when MSG_ZEROCOPY
feature is available in the host kernel, which is checked on
qio_channel_socket_connect_sync()

qio_channel_socket_writev() contents were moved to a helper function
qio_channel_socket_writev_flags() which accepts an extra argument for flags.
(This argument is passed directly to sendmsg().

The above helper function is used to implement qio_channel_socket_writev(),
with flags = 0, keeping it's behavior unchanged, and
qio_channel_socket_writev_zerocopy() with flags = MSG_ZEROCOPY.

qio_channel_socket_flush_zerocopy() was implemented by counting how many times
sendmsg(...,MSG_ZEROCOPY) was sucessfully called, and then reading the
socket's error queue, in order to find how many of them finished sending.
Flush will loop until those counters are the same, or until some error ocurs.

A new function qio_channel_socket_poll() was also created in order to avoid
busy-looping recvmsg() in qio_channel_socket_flush_zerocopy() while waiting for
updates in socket's error queue.

Notes on using writev_zerocopy():
1: Buffer
- As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
some caution is necessary to avoid overwriting any buffer before it's sent.
If something like this happen, a newer version of the buffer may be sent instead.
- If this is a problem, it's recommended to call flush_zerocopy() before freeing
or re-using the buffer.

2: Locked memory
- When using MSG_ZERCOCOPY, the buffer memory will be locked after queued, and
unlocked after it's sent.
- Depending on the size of each buffer, and how often it's sent, it may require
a larger amount of locked memory than usually available to non-root user.
- If the required amount of locked memory is not available, writev_zerocopy
will return an error, which can abort an operation like migration,
- Because of this, when an user code wants to add zerocopy as a feature, it
requires a mechanism to disable it, so it can still be acessible to less
privileged users.

Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 include/io/channel-socket.h |   2 +
 include/io/channel.h        |   1 +
 io/channel-socket.c         | 180 ++++++++++++++++++++++++++++++++++--
 3 files changed, 173 insertions(+), 10 deletions(-)

Comments

Eric Blake Oct. 11, 2021, 7:27 p.m. UTC | #1
On Sat, Oct 09, 2021 at 04:56:12AM -0300, Leonardo Bras wrote:
> For CONFIG_LINUX, implement the new optional callbacks io_write_zerocopy and
> io_flush_zerocopy on QIOChannelSocket, but enables it only when MSG_ZEROCOPY
> feature is available in the host kernel, which is checked on
> qio_channel_socket_connect_sync()
> 
> qio_channel_socket_writev() contents were moved to a helper function
> qio_channel_socket_writev_flags() which accepts an extra argument for flags.
> (This argument is passed directly to sendmsg().
> 
> The above helper function is used to implement qio_channel_socket_writev(),
> with flags = 0, keeping it's behavior unchanged, and

its (remember, "it's" is shorthand for "it is", which does not fit here)

> qio_channel_socket_writev_zerocopy() with flags = MSG_ZEROCOPY.
> 
> qio_channel_socket_flush_zerocopy() was implemented by counting how many times
> sendmsg(...,MSG_ZEROCOPY) was sucessfully called, and then reading the
> socket's error queue, in order to find how many of them finished sending.
> Flush will loop until those counters are the same, or until some error ocurs.

occurs

> 
> A new function qio_channel_socket_poll() was also created in order to avoid
> busy-looping recvmsg() in qio_channel_socket_flush_zerocopy() while waiting for
> updates in socket's error queue.
> 
> Notes on using writev_zerocopy():
> 1: Buffer
> - As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
> some caution is necessary to avoid overwriting any buffer before it's sent.
> If something like this happen, a newer version of the buffer may be sent instead.
> - If this is a problem, it's recommended to call flush_zerocopy() before freeing
> or re-using the buffer.
> 
> 2: Locked memory
> - When using MSG_ZERCOCOPY, the buffer memory will be locked after queued, and
> unlocked after it's sent.
> - Depending on the size of each buffer, and how often it's sent, it may require
> a larger amount of locked memory than usually available to non-root user.
> - If the required amount of locked memory is not available, writev_zerocopy
> will return an error, which can abort an operation like migration,
> - Because of this, when an user code wants to add zerocopy as a feature, it
> requires a mechanism to disable it, so it can still be acessible to less
> privileged users.
> 
> Signed-off-by: Leonardo Bras <leobras@redhat.com>
> ---
>  include/io/channel-socket.h |   2 +
>  include/io/channel.h        |   1 +
>  io/channel-socket.c         | 180 ++++++++++++++++++++++++++++++++++--
>  3 files changed, 173 insertions(+), 10 deletions(-)
> 
> +static int qio_channel_socket_flush_zerocopy(QIOChannel *ioc,
> +                                             Error **errp)
> +{

> +
> +        /* No errors, count sucessfully finished sendmsg()*/

Space before */

> +        sioc->zerocopy_sent += serr->ee_data - serr->ee_info + 1;
> +    }
> +    return 0;
> +}
> +
> +#endif /* CONFIG_LINUX */
> +
>  static int
>  qio_channel_socket_set_blocking(QIOChannel *ioc,
>                                  bool enabled,
> @@ -787,6 +943,10 @@ static void qio_channel_socket_class_init(ObjectClass *klass,
>      ioc_klass->io_set_delay = qio_channel_socket_set_delay;
>      ioc_klass->io_create_watch = qio_channel_socket_create_watch;
>      ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
> +#ifdef CONFIG_LINUX
> +    ioc_klass->io_writev_zerocopy = qio_channel_socket_writev_zerocopy;
> +    ioc_klass->io_flush_zerocopy = qio_channel_socket_flush_zerocopy;
> +#endif
>  }

I did a high-level look at the code, rather than an in-depth review of
whether zero-copy was being used correctly.
Leonardo Bras Oct. 11, 2021, 7:44 p.m. UTC | #2
Hello Eric,

On Mon, Oct 11, 2021 at 4:28 PM Eric Blake <eblake@redhat.com> wrote:
>
> On Sat, Oct 09, 2021 at 04:56:12AM -0300, Leonardo Bras wrote:
> > For CONFIG_LINUX, implement the new optional callbacks io_write_zerocopy and
> > io_flush_zerocopy on QIOChannelSocket, but enables it only when MSG_ZEROCOPY
> > feature is available in the host kernel, which is checked on
> > qio_channel_socket_connect_sync()
> >
> > qio_channel_socket_writev() contents were moved to a helper function
> > qio_channel_socket_writev_flags() which accepts an extra argument for flags.
> > (This argument is passed directly to sendmsg().
> >
> > The above helper function is used to implement qio_channel_socket_writev(),
> > with flags = 0, keeping it's behavior unchanged, and
>
> its (remember, "it's" is shorthand for "it is", which does not fit here)
>
> > qio_channel_socket_writev_zerocopy() with flags = MSG_ZEROCOPY.
> >
> > qio_channel_socket_flush_zerocopy() was implemented by counting how many times
> > sendmsg(...,MSG_ZEROCOPY) was sucessfully called, and then reading the
> > socket's error queue, in order to find how many of them finished sending.
> > Flush will loop until those counters are the same, or until some error ocurs.
>
> occurs

Thanks!
(I will check if my codespell is enabled in this setup)

>
> >
> > A new function qio_channel_socket_poll() was also created in order to avoid
> > busy-looping recvmsg() in qio_channel_socket_flush_zerocopy() while waiting for
> > updates in socket's error queue.
> >
> > Notes on using writev_zerocopy():
> > 1: Buffer
> > - As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
> > some caution is necessary to avoid overwriting any buffer before it's sent.
> > If something like this happen, a newer version of the buffer may be sent instead.
> > - If this is a problem, it's recommended to call flush_zerocopy() before freeing
> > or re-using the buffer.
> >
> > 2: Locked memory
> > - When using MSG_ZERCOCOPY, the buffer memory will be locked after queued, and
> > unlocked after it's sent.
> > - Depending on the size of each buffer, and how often it's sent, it may require
> > a larger amount of locked memory than usually available to non-root user.
> > - If the required amount of locked memory is not available, writev_zerocopy
> > will return an error, which can abort an operation like migration,
> > - Because of this, when an user code wants to add zerocopy as a feature, it
> > requires a mechanism to disable it, so it can still be acessible to less
> > privileged users.
> >
> > Signed-off-by: Leonardo Bras <leobras@redhat.com>
> > ---
> >  include/io/channel-socket.h |   2 +
> >  include/io/channel.h        |   1 +
> >  io/channel-socket.c         | 180 ++++++++++++++++++++++++++++++++++--
> >  3 files changed, 173 insertions(+), 10 deletions(-)
> >
> > +static int qio_channel_socket_flush_zerocopy(QIOChannel *ioc,
> > +                                             Error **errp)
> > +{
>
> > +
> > +        /* No errors, count sucessfully finished sendmsg()*/
>
> Space before */

Thanks!
Also, typo on 'successfully'.

>
> > +        sioc->zerocopy_sent += serr->ee_data - serr->ee_info + 1;
> > +    }
> > +    return 0;
> > +}
> > +
> > +#endif /* CONFIG_LINUX */
> > +
> >  static int
> >  qio_channel_socket_set_blocking(QIOChannel *ioc,
> >                                  bool enabled,
> > @@ -787,6 +943,10 @@ static void qio_channel_socket_class_init(ObjectClass *klass,
> >      ioc_klass->io_set_delay = qio_channel_socket_set_delay;
> >      ioc_klass->io_create_watch = qio_channel_socket_create_watch;
> >      ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
> > +#ifdef CONFIG_LINUX
> > +    ioc_klass->io_writev_zerocopy = qio_channel_socket_writev_zerocopy;
> > +    ioc_klass->io_flush_zerocopy = qio_channel_socket_flush_zerocopy;
> > +#endif
> >  }
>
> I did a high-level look at the code, rather than an in-depth review of
> whether zero-copy was being used correctly.

It's so far been very helpful. Thanks!

Best regards,
Leo
Peter Xu Oct. 13, 2021, 6:18 a.m. UTC | #3
On Sat, Oct 09, 2021 at 04:56:12AM -0300, Leonardo Bras wrote:
> @@ -154,6 +161,17 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
>          return -1;
>      }
>  
> +#ifdef CONFIG_LINUX
> +    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
> +    if (ret < 0) {
> +        /* Zerocopy not available on host */
> +        return 0;
> +    }
> +
> +    qio_channel_set_feature(QIO_CHANNEL(ioc),
> +                            QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY);

This is okay I think, but looks a bit weird.  Maybe nicer to be written as:

#if LINUX
      ret = setsockopt();
      if (ret == 0) {
          qio_channel_set_feature(...);
      }
#endif
      return 0;

?

> +#endif
> +
>      return 0;
>  }

[...]

> +static ssize_t qio_channel_socket_writev_zerocopy(QIOChannel *ioc,
> +                                                  const struct iovec *iov,
> +                                                  size_t niov,
> +                                                  Error **errp)
> +{
> +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> +    ssize_t ret;
> +
> +    ret = qio_channel_socket_writev_flags(ioc, iov, niov, NULL, 0,
> +                                          MSG_ZEROCOPY, errp);
> +    if (ret == QIO_CHANNEL_ERR_NOBUFS) {
> +        if (errp && *errp) {

Hmm this seems wrong, *errp should be NULL in most cases, meanwhile I think
error_setg*() takes care of errp==NULL too, so maybe we can drop this?

> +            error_setg_errno(errp, errno,
> +                             "Process can't lock enough memory for using MSG_ZEROCOPY");
> +        }
> +        return -1;
> +    }
> +
> +    sioc->zerocopy_queued++;
> +    return ret;
> +}
Leonardo Bras Oct. 27, 2021, 6:30 a.m. UTC | #4
On Wed, Oct 13, 2021 at 3:18 AM Peter Xu <peterx@redhat.com> wrote:
>
> On Sat, Oct 09, 2021 at 04:56:12AM -0300, Leonardo Bras wrote:
> > @@ -154,6 +161,17 @@ int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
> >          return -1;
> >      }
> >
> > +#ifdef CONFIG_LINUX
> > +    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
> > +    if (ret < 0) {
> > +        /* Zerocopy not available on host */
> > +        return 0;
> > +    }
> > +
> > +    qio_channel_set_feature(QIO_CHANNEL(ioc),
> > +                            QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY);
>
> This is okay I think, but looks a bit weird.  Maybe nicer to be written as:
>
> #if LINUX
>       ret = setsockopt();
>       if (ret == 0) {
>           qio_channel_set_feature(...);
>       }
> #endif
>       return 0;
>
> ?

Yeah, I also questioned myself about this one.
At the time I ended up writing like this because the lines above used
the behavior "if error, then exit/abort", and so I thought that this
would be the better way to include this feature.
But I did not consider that this is not an error exit, but a 'maybe
feature instead'.

So, I will change that like you suggested.

>
> > +#endif
> > +
> >      return 0;
> >  }
>
> [...]
>
> > +static ssize_t qio_channel_socket_writev_zerocopy(QIOChannel *ioc,
> > +                                                  const struct iovec *iov,
> > +                                                  size_t niov,
> > +                                                  Error **errp)
> > +{
> > +    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
> > +    ssize_t ret;
> > +
> > +    ret = qio_channel_socket_writev_flags(ioc, iov, niov, NULL, 0,
> > +                                          MSG_ZEROCOPY, errp);
> > +    if (ret == QIO_CHANNEL_ERR_NOBUFS) {
> > +        if (errp && *errp) {
>
> Hmm this seems wrong, *errp should be NULL in most cases, meanwhile I think
> error_setg*() takes care of errp==NULL too, so maybe we can drop this?

Yeah, you are correct.
I ended up confused about how to use err, thanks for making it more clear!

>
> > +            error_setg_errno(errp, errno,
> > +                             "Process can't lock enough memory for using MSG_ZEROCOPY");
> > +        }
> > +        return -1;
> > +    }
> > +
> > +    sioc->zerocopy_queued++;
> > +    return ret;
> > +}
>
> --
> Peter Xu
>

Best regards,
Leonardo Bras
Juan Quintela Nov. 2, 2021, 1:13 p.m. UTC | #5
Leonardo Bras <leobras@redhat.com> wrote:
> For CONFIG_LINUX, implement the new optional callbacks io_write_zerocopy and
> io_flush_zerocopy on QIOChannelSocket, but enables it only when MSG_ZEROCOPY
> feature is available in the host kernel, which is checked on
> qio_channel_socket_connect_sync()
>
> qio_channel_socket_writev() contents were moved to a helper function
> qio_channel_socket_writev_flags() which accepts an extra argument for flags.
> (This argument is passed directly to sendmsg().
>
> The above helper function is used to implement qio_channel_socket_writev(),
> with flags = 0, keeping it's behavior unchanged, and
> qio_channel_socket_writev_zerocopy() with flags = MSG_ZEROCOPY.
>
> qio_channel_socket_flush_zerocopy() was implemented by counting how many times
> sendmsg(...,MSG_ZEROCOPY) was sucessfully called, and then reading the
> socket's error queue, in order to find how many of them finished sending.
> Flush will loop until those counters are the same, or until some error ocurs.
>
> A new function qio_channel_socket_poll() was also created in order to avoid
> busy-looping recvmsg() in qio_channel_socket_flush_zerocopy() while waiting for
> updates in socket's error queue.
>
> Notes on using writev_zerocopy():
> 1: Buffer
> - As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
> some caution is necessary to avoid overwriting any buffer before it's sent.
> If something like this happen, a newer version of the buffer may be sent instead.
> - If this is a problem, it's recommended to call flush_zerocopy() before freeing
> or re-using the buffer.
>
> 2: Locked memory
> - When using MSG_ZERCOCOPY, the buffer memory will be locked after queued, and
> unlocked after it's sent.
> - Depending on the size of each buffer, and how often it's sent, it may require
> a larger amount of locked memory than usually available to non-root user.
> - If the required amount of locked memory is not available, writev_zerocopy
> will return an error, which can abort an operation like migration,
> - Because of this, when an user code wants to add zerocopy as a feature, it
> requires a mechanism to disable it, so it can still be acessible to less
> privileged users.
>
> Signed-off-by: Leonardo Bras <leobras@redhat.com>

I think this patch would be easier to review if you split in:
- add the flags parameter left and right
- add the meat of what you do with the flags.

> +++ b/include/io/channel-socket.h
> @@ -47,6 +47,8 @@ struct QIOChannelSocket {
>      socklen_t localAddrLen;
>      struct sockaddr_storage remoteAddr;
>      socklen_t remoteAddrLen;
> +    ssize_t zerocopy_queued;
> +    ssize_t zerocopy_sent;

I am not sure if this is good/bad to put it inside

#ifdef CONFIG_LINUX

basically everything else uses it.

> +#ifdef CONFIG_LINUX
> +    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
> +    if (ret < 0) {
> +        /* Zerocopy not available on host */
> +        return 0;
> +    }
> +
> +    qio_channel_set_feature(QIO_CHANNEL(ioc),
> +                            QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY);

As Peter said, you shouldn't fail if the feature is not there.

But on the other hand, on patch 3, you don't check that this feature
exist when you allow to enable multifd_zerocopy.

> +#endif
> +
>      return 0;
>  }


>          error_setg_errno(errp, errno,
>                           "Unable to write to socket");

Why do you split this in two lines?

Yes, I know that this file is not consistent either on how the do with
this, sometimes one line, otherwise more.

I don't know how ZEROCPY works at kernel level to comment on rest of the
patch.

Later, Juan.
Leonardo Bras Nov. 3, 2021, 8:50 p.m. UTC | #6
On Tue, Nov 2, 2021 at 10:13 AM Juan Quintela <quintela@redhat.com> wrote:
>
> Leonardo Bras <leobras@redhat.com> wrote:
> > For CONFIG_LINUX, implement the new optional callbacks io_write_zerocopy and
> > io_flush_zerocopy on QIOChannelSocket, but enables it only when MSG_ZEROCOPY
> > feature is available in the host kernel, which is checked on
> > qio_channel_socket_connect_sync()
> >
> > qio_channel_socket_writev() contents were moved to a helper function
> > qio_channel_socket_writev_flags() which accepts an extra argument for flags.
> > (This argument is passed directly to sendmsg().
> >
> > The above helper function is used to implement qio_channel_socket_writev(),
> > with flags = 0, keeping it's behavior unchanged, and
> > qio_channel_socket_writev_zerocopy() with flags = MSG_ZEROCOPY.
> >
> > qio_channel_socket_flush_zerocopy() was implemented by counting how many times
> > sendmsg(...,MSG_ZEROCOPY) was sucessfully called, and then reading the
> > socket's error queue, in order to find how many of them finished sending.
> > Flush will loop until those counters are the same, or until some error ocurs.
> >
> > A new function qio_channel_socket_poll() was also created in order to avoid
> > busy-looping recvmsg() in qio_channel_socket_flush_zerocopy() while waiting for
> > updates in socket's error queue.
> >
> > Notes on using writev_zerocopy():
> > 1: Buffer
> > - As MSG_ZEROCOPY tells the kernel to use the same user buffer to avoid copying,
> > some caution is necessary to avoid overwriting any buffer before it's sent.
> > If something like this happen, a newer version of the buffer may be sent instead.
> > - If this is a problem, it's recommended to call flush_zerocopy() before freeing
> > or re-using the buffer.
> >
> > 2: Locked memory
> > - When using MSG_ZERCOCOPY, the buffer memory will be locked after queued, and
> > unlocked after it's sent.
> > - Depending on the size of each buffer, and how often it's sent, it may require
> > a larger amount of locked memory than usually available to non-root user.
> > - If the required amount of locked memory is not available, writev_zerocopy
> > will return an error, which can abort an operation like migration,
> > - Because of this, when an user code wants to add zerocopy as a feature, it
> > requires a mechanism to disable it, so it can still be acessible to less
> > privileged users.
> >
> > Signed-off-by: Leonardo Bras <leobras@redhat.com>
>
> I think this patch would be easier to review if you split in:
> - add the flags parameter left and right
> - add the meat of what you do with the flags.

ok, I will try to split it like this.

>
> > +++ b/include/io/channel-socket.h
> > @@ -47,6 +47,8 @@ struct QIOChannelSocket {
> >      socklen_t localAddrLen;
> >      struct sockaddr_storage remoteAddr;
> >      socklen_t remoteAddrLen;
> > +    ssize_t zerocopy_queued;
> > +    ssize_t zerocopy_sent;
>
> I am not sure if this is good/bad to put it inside
>
> #ifdef CONFIG_LINUX
>
> basically everything else uses it.

I think it makes sense that zerocopy_{sent,queued} is inside
CONFIG_LINUX as no one else is using zerocopy.

>
> > +#ifdef CONFIG_LINUX
> > +    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
> > +    if (ret < 0) {
> > +        /* Zerocopy not available on host */
> > +        return 0;
> > +    }
> > +
> > +    qio_channel_set_feature(QIO_CHANNEL(ioc),
> > +                            QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY);
>
> As Peter said, you shouldn't fail if the feature is not there.
>
> But on the other hand, on patch 3, you don't check that this feature
> exist when you allow to enable multifd_zerocopy.

This had a major rework on v5, but I will make sure this suggestion is
addressed before releasing it.

>
> > +#endif
> > +
> >      return 0;
> >  }
>
>
> >          error_setg_errno(errp, errno,
> >                           "Unable to write to socket");
>
> Why do you split this in two lines?
>
> Yes, I know that this file is not consistent either on how the do with
> this, sometimes one line, otherwise more.

IIUC, this lines have no '+' in them, so they are not my addition.

>
> I don't know how ZEROCPY works at kernel level to comment on rest of the
> patch.
>
> Later, Juan.

Thanks for reviewing Juan.

Best regards,
Leo
diff mbox series

Patch

diff --git a/include/io/channel-socket.h b/include/io/channel-socket.h
index e747e63514..81d04baa4c 100644
--- a/include/io/channel-socket.h
+++ b/include/io/channel-socket.h
@@ -47,6 +47,8 @@  struct QIOChannelSocket {
     socklen_t localAddrLen;
     struct sockaddr_storage remoteAddr;
     socklen_t remoteAddrLen;
+    ssize_t zerocopy_queued;
+    ssize_t zerocopy_sent;
 };
 
 
diff --git a/include/io/channel.h b/include/io/channel.h
index e7d4e1521f..9d74629226 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -31,6 +31,7 @@  OBJECT_DECLARE_TYPE(QIOChannel, QIOChannelClass,
 
 
 #define QIO_CHANNEL_ERR_BLOCK -2
+#define QIO_CHANNEL_ERR_NOBUFS -3
 
 #define QIO_CHANNEL_WRITE_FLAG_ZEROCOPY 0x1
 
diff --git a/io/channel-socket.c b/io/channel-socket.c
index 606ec97cf7..6cc42057b2 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -26,6 +26,10 @@ 
 #include "io/channel-watch.h"
 #include "trace.h"
 #include "qapi/clone-visitor.h"
+#ifdef CONFIG_LINUX
+#include <linux/errqueue.h>
+#include <poll.h>
+#endif
 
 #define SOCKET_MAX_FDS 16
 
@@ -55,6 +59,8 @@  qio_channel_socket_new(void)
 
     sioc = QIO_CHANNEL_SOCKET(object_new(TYPE_QIO_CHANNEL_SOCKET));
     sioc->fd = -1;
+    sioc->zerocopy_queued = 0;
+    sioc->zerocopy_sent = 0;
 
     ioc = QIO_CHANNEL(sioc);
     qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
@@ -140,6 +146,7 @@  int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
                                     Error **errp)
 {
     int fd;
+    int ret, v = 1;
 
     trace_qio_channel_socket_connect_sync(ioc, addr);
     fd = socket_connect(addr, errp);
@@ -154,6 +161,17 @@  int qio_channel_socket_connect_sync(QIOChannelSocket *ioc,
         return -1;
     }
 
+#ifdef CONFIG_LINUX
+    ret = qemu_setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &v, sizeof(v));
+    if (ret < 0) {
+        /* Zerocopy not available on host */
+        return 0;
+    }
+
+    qio_channel_set_feature(QIO_CHANNEL(ioc),
+                            QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY);
+#endif
+
     return 0;
 }
 
@@ -520,12 +538,13 @@  static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
     return ret;
 }
 
-static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
-                                         const struct iovec *iov,
-                                         size_t niov,
-                                         int *fds,
-                                         size_t nfds,
-                                         Error **errp)
+static ssize_t qio_channel_socket_writev_flags(QIOChannel *ioc,
+                                               const struct iovec *iov,
+                                               size_t niov,
+                                               int *fds,
+                                               size_t nfds,
+                                               int flags,
+                                               Error **errp)
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
     ssize_t ret;
@@ -558,20 +577,34 @@  static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
     }
 
  retry:
-    ret = sendmsg(sioc->fd, &msg, 0);
+    ret = sendmsg(sioc->fd, &msg, flags);
     if (ret <= 0) {
-        if (errno == EAGAIN) {
+        switch (errno) {
+        case EAGAIN:
             return QIO_CHANNEL_ERR_BLOCK;
-        }
-        if (errno == EINTR) {
+        case EINTR:
             goto retry;
+        case ENOBUFS:
+            return QIO_CHANNEL_ERR_NOBUFS;
         }
+
         error_setg_errno(errp, errno,
                          "Unable to write to socket");
         return -1;
     }
     return ret;
 }
+
+static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
+                                         const struct iovec *iov,
+                                         size_t niov,
+                                         int *fds,
+                                         size_t nfds,
+                                         Error **errp)
+{
+    return qio_channel_socket_writev_flags(ioc, iov, niov, fds, nfds, 0, errp);
+}
+
 #else /* WIN32 */
 static ssize_t qio_channel_socket_readv(QIOChannel *ioc,
                                         const struct iovec *iov,
@@ -658,6 +691,129 @@  static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
 }
 #endif /* WIN32 */
 
+
+#ifdef CONFIG_LINUX
+
+static int qio_channel_socket_poll(QIOChannelSocket *sioc, bool zerocopy,
+                                   Error **errp)
+{
+    struct pollfd pfd;
+    int ret;
+
+    pfd.fd = sioc->fd;
+    pfd.events = 0;
+
+ retry:
+    ret = poll(&pfd, 1, -1);
+    if (ret < 0) {
+        switch (errno) {
+        case EAGAIN:
+        case EINTR:
+            goto retry;
+        default:
+            error_setg_errno(errp, errno,
+                             "Poll error");
+            return ret;
+        }
+    }
+
+    if (pfd.revents & (POLLHUP | POLLNVAL)) {
+        error_setg(errp, "Poll error: Invalid or disconnected fd");
+        return -1;
+    }
+
+    if (!zerocopy && (pfd.revents & POLLERR)) {
+        error_setg(errp, "Poll error: Errors present in errqueue");
+        return -1;
+    }
+
+    return ret;
+}
+
+static ssize_t qio_channel_socket_writev_zerocopy(QIOChannel *ioc,
+                                                  const struct iovec *iov,
+                                                  size_t niov,
+                                                  Error **errp)
+{
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+    ssize_t ret;
+
+    ret = qio_channel_socket_writev_flags(ioc, iov, niov, NULL, 0,
+                                          MSG_ZEROCOPY, errp);
+    if (ret == QIO_CHANNEL_ERR_NOBUFS) {
+        if (errp && *errp) {
+            error_setg_errno(errp, errno,
+                             "Process can't lock enough memory for using MSG_ZEROCOPY");
+        }
+        return -1;
+    }
+
+    sioc->zerocopy_queued++;
+    return ret;
+}
+
+static int qio_channel_socket_flush_zerocopy(QIOChannel *ioc,
+                                             Error **errp)
+{
+    QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
+    struct msghdr msg = {};
+    struct sock_extended_err *serr;
+    struct cmsghdr *cm;
+    char control[CMSG_SPACE(sizeof(*serr))];
+    int ret;
+
+    msg.msg_control = control;
+    msg.msg_controllen = sizeof(control);
+    memset(control, 0, sizeof(control));
+
+    while (sioc->zerocopy_sent < sioc->zerocopy_queued) {
+        ret = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
+        if (ret < 0) {
+            switch (errno) {
+            case EAGAIN:
+                /* Nothing on errqueue, wait until something is available*/
+                ret = qio_channel_socket_poll(sioc, true, errp);
+                if (ret < 0) {
+                    return -1;
+                }
+                continue;
+            case EINTR:
+                continue;
+            default:
+                error_setg_errno(errp, errno,
+                                 "Unable to read errqueue");
+                return -1;
+            }
+        }
+
+        cm = CMSG_FIRSTHDR(&msg);
+        if (cm->cmsg_level != SOL_IP &&
+            cm->cmsg_type != IP_RECVERR) {
+            error_setg_errno(errp, EPROTOTYPE,
+                             "Wrong cmsg in errqueue");
+            return -1;
+        }
+
+        serr = (void *) CMSG_DATA(cm);
+        if (serr->ee_errno != SO_EE_ORIGIN_NONE) {
+            error_setg_errno(errp, serr->ee_errno,
+                             "Error on socket");
+            return -1;
+        }
+        if (serr->ee_origin != SO_EE_ORIGIN_ZEROCOPY) {
+            error_setg_errno(errp, serr->ee_origin,
+                             "Error not from zerocopy");
+            return -1;
+        }
+
+        /* No errors, count sucessfully finished sendmsg()*/
+        sioc->zerocopy_sent += serr->ee_data - serr->ee_info + 1;
+    }
+    return 0;
+}
+
+#endif /* CONFIG_LINUX */
+
 static int
 qio_channel_socket_set_blocking(QIOChannel *ioc,
                                 bool enabled,
@@ -787,6 +943,10 @@  static void qio_channel_socket_class_init(ObjectClass *klass,
     ioc_klass->io_set_delay = qio_channel_socket_set_delay;
     ioc_klass->io_create_watch = qio_channel_socket_create_watch;
     ioc_klass->io_set_aio_fd_handler = qio_channel_socket_set_aio_fd_handler;
+#ifdef CONFIG_LINUX
+    ioc_klass->io_writev_zerocopy = qio_channel_socket_writev_zerocopy;
+    ioc_klass->io_flush_zerocopy = qio_channel_socket_flush_zerocopy;
+#endif
 }
 
 static const TypeInfo qio_channel_socket_info = {