diff mbox series

[v2,1/3] QIOCHannel: Add io_async_writev & io_async_flush callbacks

Message ID 20210922050340.614781-2-leobras@redhat.com (mailing list archive)
State New, archived
Headers show
Series QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd | expand

Commit Message

Leonardo Bras Sept. 22, 2021, 5:03 a.m. UTC
Adds io_async_writev and io_async_flush as optional callback to QIOChannelClass,
allowing the implementation of asynchronous writes by subclasses.

How to use them:
- Write data using qio_channel_async_writev(),
- Wait write completion with qio_channel_async_flush().

Notes:
Some asynchronous implementations may benefit from zerocopy mechanisms, so it's
recommended to keep the write buffer untouched until the return of
qio_channel_async_flush().

As the new callbacks are optional, if a subclass does not implement them
there will be a fallback to the mandatory synchronous implementation:
- io_async_writev will fallback to io_writev,
- io_async_flush will return without changing anything.
This makes simpler for the user to make use of the asynchronous implementation.

Also, some functions like qio_channel_writev_full_all() were adapted to
offer an async version, and make better use of the new callbacks.

Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 include/io/channel.h | 93 +++++++++++++++++++++++++++++++++++++-------
 io/channel.c         | 66 ++++++++++++++++++++++++-------
 2 files changed, 129 insertions(+), 30 deletions(-)
diff mbox series

Patch

diff --git a/include/io/channel.h b/include/io/channel.h
index 88988979f8..74f2e3ae8a 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -136,6 +136,14 @@  struct QIOChannelClass {
                                   IOHandler *io_read,
                                   IOHandler *io_write,
                                   void *opaque);
+    ssize_t (*io_async_writev)(QIOChannel *ioc,
+                               const struct iovec *iov,
+                               size_t niov,
+                               int *fds,
+                               size_t nfds,
+                               Error **errp);
+   void (*io_async_flush)(QIOChannel *ioc,
+                          Error **errp);
 };
 
 /* General I/O handling functions */
@@ -255,12 +263,17 @@  ssize_t qio_channel_readv_full(QIOChannel *ioc,
  * or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
  * and the channel is non-blocking
  */
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds,
-                                size_t nfds,
-                                Error **errp);
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds,
+                                  size_t nfds,
+                                  bool async,
+                                  Error **errp);
+#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, true, errp)
 
 /**
  * qio_channel_readv_all_eof:
@@ -339,10 +352,15 @@  int qio_channel_readv_all(QIOChannel *ioc,
  *
  * Returns: 0 if all bytes were written, or -1 on error
  */
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **erp);
+int __qio_channel_writev_all(QIOChannel *ioc,
+                             const struct iovec *iov,
+                             size_t niov,
+                             bool async,
+                             Error **erp);
+#define qio_channel_writev_all(ioc, iov, niov, erp) \
+    __qio_channel_writev_all(ioc, iov, niov, false, erp)
+#define qio_channel_async_writev_all(ioc, iov, niov, erp) \
+    __qio_channel_writev_all(ioc, iov, niov, true, erp)
 
 /**
  * qio_channel_readv:
@@ -849,10 +867,55 @@  int qio_channel_readv_full_all(QIOChannel *ioc,
  * Returns: 0 if all bytes were written, or -1 on error
  */
 
-int qio_channel_writev_full_all(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds, size_t nfds,
-                                Error **errp);
+int __qio_channel_writev_full_all(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds, size_t nfds,
+                                  bool async, Error **errp);
+#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, true, errp)
+
+/**
+ * qio_channel_async_writev:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to write data from
+ * @niov: the length of the @iov array
+ * @fds: an array of file handles to send
+ * @nfds: number of file handles in @fds
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Behaves like qio_channel_writev_full, but will send
+ * data asynchronously, this meaning this function
+ * may return before the data is actually sent.
+ *
+ * If at some point it's necessary wait for all data to be
+ * sent, use qio_channel_async_flush().
+ *
+ * If not implemented, falls back to the default writev
+ */
+
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+                                 const struct iovec *iov,
+                                 size_t niov,
+                                 int *fds,
+                                 size_t nfds,
+                                 Error **errp);
+
+/**
+ * qio_channel_async_flush:
+ * @ioc: the channel object
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Will lock until every packet queued with qio_channel_async_writev()
+ * is sent.
+ *
+ * If not implemented, returns without changing anything.
+ */
+
+void qio_channel_async_flush(QIOChannel *ioc,
+                             Error **errp);
+
 
 #endif /* QIO_CHANNEL_H */
diff --git a/io/channel.c b/io/channel.c
index e8b019dc36..a35109a006 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -67,12 +67,13 @@  ssize_t qio_channel_readv_full(QIOChannel *ioc,
 }
 
 
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds,
-                                size_t nfds,
-                                Error **errp)
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds,
+                                  size_t nfds,
+                                  bool async,
+                                  Error **errp)
 {
     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 
@@ -83,6 +84,10 @@  ssize_t qio_channel_writev_full(QIOChannel *ioc,
         return -1;
     }
 
+    if (async) {
+        return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+    }
+
     return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
 }
 
@@ -212,19 +217,20 @@  int qio_channel_readv_full_all(QIOChannel *ioc,
     return ret;
 }
 
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **errp)
+int __qio_channel_writev_all(QIOChannel *ioc,
+                             const struct iovec *iov,
+                             size_t niov,
+                             bool async,
+                             Error **errp)
 {
-    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
+    return __qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, async, errp);
 }
 
-int qio_channel_writev_full_all(QIOChannel *ioc,
+int __qio_channel_writev_full_all(QIOChannel *ioc,
                                 const struct iovec *iov,
                                 size_t niov,
                                 int *fds, size_t nfds,
-                                Error **errp)
+                                bool async, Error **errp)
 {
     int ret = -1;
     struct iovec *local_iov = g_new(struct iovec, niov);
@@ -237,8 +243,8 @@  int qio_channel_writev_full_all(QIOChannel *ioc,
 
     while (nlocal_iov > 0) {
         ssize_t len;
-        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
-                                      errp);
+        len = __qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
+                                        async, errp);
         if (len == QIO_CHANNEL_ERR_BLOCK) {
             if (qemu_in_coroutine()) {
                 qio_channel_yield(ioc, G_IO_OUT);
@@ -474,6 +480,36 @@  off_t qio_channel_io_seek(QIOChannel *ioc,
 }
 
 
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+                                 const struct iovec *iov,
+                                 size_t niov,
+                                 int *fds,
+                                 size_t nfds,
+                                 Error **errp)
+{
+     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+    if (!klass->io_async_writev) {
+        return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
+    }
+
+     return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+}
+
+
+void qio_channel_async_flush(QIOChannel *ioc,
+                             Error **errp)
+{
+     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+    if (!klass->io_async_flush) {
+        return;
+    }
+
+     klass->io_async_flush(ioc, errp);
+}
+
+
 static void qio_channel_restart_read(void *opaque)
 {
     QIOChannel *ioc = opaque;