diff mbox series

[RFC,3/4] QIOChannel: Add max_pending parameter to qio_channel_flush()

Message ID 20221025044730.319941-4-leobras@redhat.com (mailing list archive)
State New, archived
Headers show
Series MultiFD zero-copy improvements | expand

Commit Message

Leonardo Bras Oct. 25, 2022, 4:47 a.m. UTC
Zero-copy write in Linux is an asynchronous type of write, meaning the send
process is not finished by the time the function returns. Since it's also
zero-copy, it means that incorrect data may be sent if the write buffer
gets modified after write returns.

To check if a zero-copy write is finished, qio_channel has implemented a
flush operation: qio_channel_flush(). As of today, by the time the flush
returns, user code knows for sure all previous zero-copy write have
finished, and it's safe to modify any write buffer.

While this kind of flush is necessary, it may take a while to flush if
there has been a write recently, as the OS has to wait for everything to be
sent before returning and allowing reuse / free of the write buffers.

An option that can improve performance in some scenarios is to modify flush
so it guarantees less than N zero-copy writes are pending, allowing some of
the write buffers to be reused. This allows flush to return much faster,
since it does not need to wait for the more recent writes to complete.

If (N == 0), then it replicates the previous flushing behavior.

Add max_pending parameter to qio_channel_flush() so caller can decide
what is the maximum number of pending writes remaining before the function
returns. Also, implement this change in qio_channel_socket_flush().

Change current calls of qio_channel_flush() so (max_pending == 0), and the
flush-all behavior is maintained.

Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 include/io/channel.h | 7 +++++--
 io/channel-socket.c  | 5 +++--
 io/channel.c         | 5 +++--
 migration/multifd.c  | 2 +-
 4 files changed, 12 insertions(+), 7 deletions(-)
diff mbox series

Patch

diff --git a/include/io/channel.h b/include/io/channel.h
index c680ee7480..9ec1978a26 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -141,6 +141,7 @@  struct QIOChannelClass {
                                   IOHandler *io_write,
                                   void *opaque);
     int (*io_flush)(QIOChannel *ioc,
+                    int max_pending,
                     Error **errp);
 };
 
@@ -875,11 +876,12 @@  int qio_channel_writev_full_all(QIOChannel *ioc,
 /**
  * qio_channel_flush:
  * @ioc: the channel object
+ * @max_pending: Maximum remaining writes allowed in queue upon returning
  * @errp: pointer to a NULL-initialized error object
  *
- * Will block until every packet queued with
+ * Will block until there are at most max_pending writes called with
  * qio_channel_writev_full() + QIO_CHANNEL_WRITE_FLAG_ZERO_COPY
- * is sent, or return in case of any error.
+ * pending, or return in case of any error.
  *
  * If not implemented, acts as a no-op, and returns 0.
  *
@@ -889,6 +891,7 @@  int qio_channel_writev_full_all(QIOChannel *ioc,
  */
 
 int qio_channel_flush(QIOChannel *ioc,
+                      int max_pending,
                       Error **errp);
 
 #endif /* QIO_CHANNEL_H */
diff --git a/io/channel-socket.c b/io/channel-socket.c
index b76dca9cc1..3d0c2b8a14 100644
--- a/io/channel-socket.c
+++ b/io/channel-socket.c
@@ -708,6 +708,7 @@  static ssize_t qio_channel_socket_writev(QIOChannel *ioc,
 
 #ifdef QEMU_MSG_ZEROCOPY
 static int qio_channel_socket_flush(QIOChannel *ioc,
+                                    int max_pending,
                                     Error **errp)
 {
     QIOChannelSocket *sioc = QIO_CHANNEL_SOCKET(ioc);
@@ -718,7 +719,7 @@  static int qio_channel_socket_flush(QIOChannel *ioc,
     int received;
     int ret;
 
-    if (sioc->zero_copy_queued == sioc->zero_copy_sent) {
+    if (sioc->zero_copy_queued - sioc->zero_copy_sent <= max_pending) {
         return 0;
     }
 
@@ -728,7 +729,7 @@  static int qio_channel_socket_flush(QIOChannel *ioc,
 
     ret = 1;
 
-    while (sioc->zero_copy_sent < sioc->zero_copy_queued) {
+    while (sioc->zero_copy_queued - sioc->zero_copy_sent > max_pending) {
         received = recvmsg(sioc->fd, &msg, MSG_ERRQUEUE);
         if (received < 0) {
             switch (errno) {
diff --git a/io/channel.c b/io/channel.c
index 0640941ac5..9d9f15af50 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -490,7 +490,8 @@  off_t qio_channel_io_seek(QIOChannel *ioc,
 }
 
 int qio_channel_flush(QIOChannel *ioc,
-                                Error **errp)
+                      int max_pending,
+                      Error **errp)
 {
     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 
@@ -499,7 +500,7 @@  int qio_channel_flush(QIOChannel *ioc,
         return 0;
     }
 
-    return klass->io_flush(ioc, errp);
+    return klass->io_flush(ioc, max_pending, errp);
 }
 
 
diff --git a/migration/multifd.c b/migration/multifd.c
index aa18c47141..c5d1f911a4 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -574,7 +574,7 @@  static int multifd_zero_copy_flush(QIOChannel *c)
     int ret;
     Error *err = NULL;
 
-    ret = qio_channel_flush(c, &err);
+    ret = qio_channel_flush(c, 0, &err);
     if (ret < 0) {
         error_report_err(err);
         return -1;