diff mbox series

[v3,1/3] QIOChannel: Add io_async_writev & io_async_flush callbacks

Message ID 20210922222423.644444-2-leobras@redhat.com (mailing list archive)
State New, archived
Headers show
Series QIOChannel async_write & async_flush + MSG_ZEROCOPY + multifd | expand

Commit Message

Leonardo Bras Sept. 22, 2021, 10:24 p.m. UTC
Adds io_async_writev and io_async_flush as optional callback to QIOChannelClass,
allowing the implementation of asynchronous writes by subclasses.

How to use them:
- Write data using qio_channel_async_writev(),
- Wait write completion with qio_channel_async_flush().

Notes:
Some asynchronous implementations may benefit from zerocopy mechanisms, so it's
recommended to keep the write buffer untouched until the return of
qio_channel_async_flush().

As the new callbacks are optional, if a subclass does not implement them
there will be a fallback to the mandatory synchronous implementation:
- io_async_writev will fallback to io_writev,
- io_async_flush will return without changing anything.
This makes simpler for the user to make use of the asynchronous implementation.

Also, some functions like qio_channel_writev_full_all() were adapted to
offer an async version, and make better use of the new callbacks.

Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 include/io/channel.h | 93 +++++++++++++++++++++++++++++++++++++-------
 io/channel.c         | 66 ++++++++++++++++++++++++-------
 2 files changed, 129 insertions(+), 30 deletions(-)

Comments

Daniel P. Berrangé Sept. 24, 2021, 5:16 p.m. UTC | #1
On Wed, Sep 22, 2021 at 07:24:21PM -0300, Leonardo Bras wrote:
> Adds io_async_writev and io_async_flush as optional callback to QIOChannelClass,
> allowing the implementation of asynchronous writes by subclasses.
> 
> How to use them:
> - Write data using qio_channel_async_writev(),
> - Wait write completion with qio_channel_async_flush().
> 
> Notes:
> Some asynchronous implementations may benefit from zerocopy mechanisms, so it's
> recommended to keep the write buffer untouched until the return of
> qio_channel_async_flush().
> 
> As the new callbacks are optional, if a subclass does not implement them
> there will be a fallback to the mandatory synchronous implementation:
> - io_async_writev will fallback to io_writev,
> - io_async_flush will return without changing anything.
> This makes simpler for the user to make use of the asynchronous implementation.
> 
> Also, some functions like qio_channel_writev_full_all() were adapted to
> offer an async version, and make better use of the new callbacks.
> 
> Signed-off-by: Leonardo Bras <leobras@redhat.com>
> ---
>  include/io/channel.h | 93 +++++++++++++++++++++++++++++++++++++-------
>  io/channel.c         | 66 ++++++++++++++++++++++++-------
>  2 files changed, 129 insertions(+), 30 deletions(-)
> 
> diff --git a/include/io/channel.h b/include/io/channel.h
> index 88988979f8..74f2e3ae8a 100644
> --- a/include/io/channel.h
> +++ b/include/io/channel.h
> @@ -136,6 +136,14 @@ struct QIOChannelClass {
>                                    IOHandler *io_read,
>                                    IOHandler *io_write,
>                                    void *opaque);
> +    ssize_t (*io_async_writev)(QIOChannel *ioc,
> +                               const struct iovec *iov,
> +                               size_t niov,
> +                               int *fds,
> +                               size_t nfds,
> +                               Error **errp);
> +   void (*io_async_flush)(QIOChannel *ioc,
> +                          Error **errp);
>  };
>  
>  /* General I/O handling functions */
> @@ -255,12 +263,17 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
>   * or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
>   * and the channel is non-blocking
>   */
> -ssize_t qio_channel_writev_full(QIOChannel *ioc,
> -                                const struct iovec *iov,
> -                                size_t niov,
> -                                int *fds,
> -                                size_t nfds,
> -                                Error **errp);
> +ssize_t __qio_channel_writev_full(QIOChannel *ioc,

Using "__" is undesirable as that namespace is reserved.

> +                                  const struct iovec *iov,
> +                                  size_t niov,
> +                                  int *fds,
> +                                  size_t nfds,
> +                                  bool async,
> +                                  Error **errp);
> +#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, false, errp)
> +#define qio_channel_async_writev_full(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, true, errp)

The API docs only cover the first function, not the second.


>  /**
>   * qio_channel_readv_all_eof:
> @@ -339,10 +352,15 @@ int qio_channel_readv_all(QIOChannel *ioc,
>   *
>   * Returns: 0 if all bytes were written, or -1 on error
>   */
> -int qio_channel_writev_all(QIOChannel *ioc,
> -                           const struct iovec *iov,
> -                           size_t niov,
> -                           Error **erp);
> +int __qio_channel_writev_all(QIOChannel *ioc,
> +                             const struct iovec *iov,
> +                             size_t niov,
> +                             bool async,
> +                             Error **erp);
> +#define qio_channel_writev_all(ioc, iov, niov, erp) \
> +    __qio_channel_writev_all(ioc, iov, niov, false, erp)
> +#define qio_channel_async_writev_all(ioc, iov, niov, erp) \
> +    __qio_channel_writev_all(ioc, iov, niov, true, erp)


>  
>  /**
>   * qio_channel_readv:
> @@ -849,10 +867,55 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
>   * Returns: 0 if all bytes were written, or -1 on error
>   */
>  
> -int qio_channel_writev_full_all(QIOChannel *ioc,
> -                                const struct iovec *iov,
> -                                size_t niov,
> -                                int *fds, size_t nfds,
> -                                Error **errp);
> +int __qio_channel_writev_full_all(QIOChannel *ioc,
> +                                  const struct iovec *iov,
> +                                  size_t niov,
> +                                  int *fds, size_t nfds,
> +                                  bool async, Error **errp);
> +#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, false, errp)
> +#define qio_channel_async_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
> +    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, true, errp)
> +
> +/**
> + * qio_channel_async_writev:
> + * @ioc: the channel object
> + * @iov: the array of memory regions to write data from
> + * @niov: the length of the @iov array
> + * @fds: an array of file handles to send
> + * @nfds: number of file handles in @fds
> + * @errp: pointer to a NULL-initialized error object
> + *
> + * Behaves like qio_channel_writev_full, but will send
> + * data asynchronously, this meaning this function
> + * may return before the data is actually sent.
> + *
> + * If at some point it's necessary wait for all data to be
> + * sent, use qio_channel_async_flush().
> + *
> + * If not implemented, falls back to the default writev
> + */

I'm not convinced by the fallback here. If you're
layering I/O channels this is not going to result in
desirable behaviour.

eg if QIOChannelTLS doesn't implement async, then when
you call async_writev, it'lll invoke sync writev on
the QIOChannelTLS, which will in turn invoke the sync
writev on QIOChannelSocket, despite the latter having
async writev support.  I think this is very misleading
behaviour

> +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> +                                 const struct iovec *iov,
> +                                 size_t niov,
> +                                 int *fds,
> +                                 size_t nfds,
> +                                 Error **errp);

This is missing any flags. We need something like

   QIO_CHANNEL_WRITE_FLAG_ZEROCOPY

passed in an 'unsigned int flags' parameter. This in
turn makes me question whether we should have the
common helpers at all, as the api is going to be
different for sync vs async.

The QIOChannelFeature enum probably ought to be
extended with QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY with
support for probing whether that's supported or not.

> +
> +/**
> + * qio_channel_async_flush:
> + * @ioc: the channel object
> + * @errp: pointer to a NULL-initialized error object
> + *
> + * Will lock until every packet queued with qio_channel_async_writev()

s/lock/block/ I presume.

> + * is sent.
> + *
> + * If not implemented, returns without changing anything.
> + */
> +
> +void qio_channel_async_flush(QIOChannel *ioc,
> +                             Error **errp);
> +
>  
>  #endif /* QIO_CHANNEL_H */
> diff --git a/io/channel.c b/io/channel.c
> index e8b019dc36..c4819b922f 100644
> --- a/io/channel.c
> +++ b/io/channel.c
> @@ -67,12 +67,13 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
>  }
>  
>  
> -ssize_t qio_channel_writev_full(QIOChannel *ioc,
> -                                const struct iovec *iov,
> -                                size_t niov,
> -                                int *fds,
> -                                size_t nfds,
> -                                Error **errp)
> +ssize_t __qio_channel_writev_full(QIOChannel *ioc,
> +                                  const struct iovec *iov,
> +                                  size_t niov,
> +                                  int *fds,
> +                                  size_t nfds,
> +                                  bool async,
> +                                  Error **errp)
>  {
>      QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
>  
> @@ -83,6 +84,10 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
>          return -1;
>      }
>  
> +    if (async && klass->io_async_writev) {
> +        return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
> +    }
> +
>      return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
>  }
>  
> @@ -212,19 +217,20 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
>      return ret;
>  }
>  
> -int qio_channel_writev_all(QIOChannel *ioc,
> -                           const struct iovec *iov,
> -                           size_t niov,
> -                           Error **errp)
> +int __qio_channel_writev_all(QIOChannel *ioc,
> +                             const struct iovec *iov,
> +                             size_t niov,
> +                             bool async,
> +                             Error **errp)
>  {
> -    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
> +    return __qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, async, errp);
>  }
>  
> -int qio_channel_writev_full_all(QIOChannel *ioc,
> +int __qio_channel_writev_full_all(QIOChannel *ioc,
>                                  const struct iovec *iov,
>                                  size_t niov,
>                                  int *fds, size_t nfds,
> -                                Error **errp)
> +                                bool async, Error **errp)
>  {
>      int ret = -1;
>      struct iovec *local_iov = g_new(struct iovec, niov);
> @@ -237,8 +243,8 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
>  
>      while (nlocal_iov > 0) {
>          ssize_t len;
> -        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
> -                                      errp);
> +        len = __qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
> +                                        async, errp);
>          if (len == QIO_CHANNEL_ERR_BLOCK) {
>              if (qemu_in_coroutine()) {
>                  qio_channel_yield(ioc, G_IO_OUT);
> @@ -474,6 +480,36 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
>  }
>  
>  
> +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> +                                 const struct iovec *iov,
> +                                 size_t niov,
> +                                 int *fds,
> +                                 size_t nfds,
> +                                 Error **errp)
> +{
> +     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> +
> +    if (!klass->io_async_writev) {
> +        return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
> +    }
> +
> +     return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
> +}
> +
> +
> +void qio_channel_async_flush(QIOChannel *ioc,
> +                             Error **errp)
> +{
> +     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> +
> +    if (!klass->io_async_flush) {
> +        return;
> +    }
> +
> +     klass->io_async_flush(ioc, errp);
> +}
> +
> +
>  static void qio_channel_restart_read(void *opaque)
>  {
>      QIOChannel *ioc = opaque;
> -- 
> 2.33.0
> 

Regards,
Daniel
Peter Xu Sept. 28, 2021, 9:52 p.m. UTC | #2
On Fri, Sep 24, 2021 at 06:16:04PM +0100, Daniel P. Berrangé wrote:
> > +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> > +                                 const struct iovec *iov,
> > +                                 size_t niov,
> > +                                 int *fds,
> > +                                 size_t nfds,
> > +                                 Error **errp);
> 
> This is missing any flags. We need something like
> 
>    QIO_CHANNEL_WRITE_FLAG_ZEROCOPY
> 
> passed in an 'unsigned int flags' parameter. This in
> turn makes me question whether we should have the
> common helpers at all, as the api is going to be
> different for sync vs async.
> 
> The QIOChannelFeature enum probably ought to be
> extended with QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY with
> support for probing whether that's supported or not.

I'm also wondering whether we could just drop the fds/nfds as per my knowledge
SCM_RIGHT is the only user, at the meantime I don't see why an async interface
would pass in any fd anyways..  Thanks,
Leonardo Bras Sept. 29, 2021, 7:03 p.m. UTC | #3
Hello Daniel, thank you for reviewing!

On Fri, Sep 24, 2021 at 2:16 PM Daniel P. Berrangé <berrange@redhat.com> wrote:
>
> On Wed, Sep 22, 2021 at 07:24:21PM -0300, Leonardo Bras wrote:
> > Adds io_async_writev and io_async_flush as optional callback to QIOChannelClass,
> > allowing the implementation of asynchronous writes by subclasses.
> >
> > How to use them:
> > - Write data using qio_channel_async_writev(),
> > - Wait write completion with qio_channel_async_flush().
> >
> > Notes:
> > Some asynchronous implementations may benefit from zerocopy mechanisms, so it's
> > recommended to keep the write buffer untouched until the return of
> > qio_channel_async_flush().
> >
> > As the new callbacks are optional, if a subclass does not implement them
> > there will be a fallback to the mandatory synchronous implementation:
> > - io_async_writev will fallback to io_writev,
> > - io_async_flush will return without changing anything.
> > This makes simpler for the user to make use of the asynchronous implementation.
> >
> > Also, some functions like qio_channel_writev_full_all() were adapted to
> > offer an async version, and make better use of the new callbacks.
> >
> > Signed-off-by: Leonardo Bras <leobras@redhat.com>
> > ---
> >  include/io/channel.h | 93 +++++++++++++++++++++++++++++++++++++-------
> >  io/channel.c         | 66 ++++++++++++++++++++++++-------
> >  2 files changed, 129 insertions(+), 30 deletions(-)
> >
> > diff --git a/include/io/channel.h b/include/io/channel.h
> > index 88988979f8..74f2e3ae8a 100644
> > --- a/include/io/channel.h
> > +++ b/include/io/channel.h
> > @@ -136,6 +136,14 @@ struct QIOChannelClass {
> >                                    IOHandler *io_read,
> >                                    IOHandler *io_write,
> >                                    void *opaque);
> > +    ssize_t (*io_async_writev)(QIOChannel *ioc,
> > +                               const struct iovec *iov,
> > +                               size_t niov,
> > +                               int *fds,
> > +                               size_t nfds,
> > +                               Error **errp);
> > +   void (*io_async_flush)(QIOChannel *ioc,
> > +                          Error **errp);
> >  };
> >
> >  /* General I/O handling functions */
> > @@ -255,12 +263,17 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
> >   * or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
> >   * and the channel is non-blocking
> >   */
> > -ssize_t qio_channel_writev_full(QIOChannel *ioc,
> > -                                const struct iovec *iov,
> > -                                size_t niov,
> > -                                int *fds,
> > -                                size_t nfds,
> > -                                Error **errp);
> > +ssize_t __qio_channel_writev_full(QIOChannel *ioc,
>
> Using "__" is undesirable as that namespace is reserved.

Thank you for the tip!
I will make sure to remember avoiding this in the future.


>
> > +                                  const struct iovec *iov,
> > +                                  size_t niov,
> > +                                  int *fds,
> > +                                  size_t nfds,
> > +                                  bool async,
> > +                                  Error **errp);
> > +#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
> > +    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, false, errp)
> > +#define qio_channel_async_writev_full(ioc, iov, niov, fds, nfds, errp) \
> > +    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, true, errp)
>
> The API docs only cover the first function, not the second.

You are right.
If this ends up being the final implementation, I will make sure to
provide correct docs.

>
>
> >  /**
> >   * qio_channel_readv_all_eof:
> > @@ -339,10 +352,15 @@ int qio_channel_readv_all(QIOChannel *ioc,
> >   *
> >   * Returns: 0 if all bytes were written, or -1 on error
> >   */
> > -int qio_channel_writev_all(QIOChannel *ioc,
> > -                           const struct iovec *iov,
> > -                           size_t niov,
> > -                           Error **erp);
> > +int __qio_channel_writev_all(QIOChannel *ioc,
> > +                             const struct iovec *iov,
> > +                             size_t niov,
> > +                             bool async,
> > +                             Error **erp);
> > +#define qio_channel_writev_all(ioc, iov, niov, erp) \
> > +    __qio_channel_writev_all(ioc, iov, niov, false, erp)
> > +#define qio_channel_async_writev_all(ioc, iov, niov, erp) \
> > +    __qio_channel_writev_all(ioc, iov, niov, true, erp)
>
>
> >
> >  /**
> >   * qio_channel_readv:
> > @@ -849,10 +867,55 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
> >   * Returns: 0 if all bytes were written, or -1 on error
> >   */
> >
> > -int qio_channel_writev_full_all(QIOChannel *ioc,
> > -                                const struct iovec *iov,
> > -                                size_t niov,
> > -                                int *fds, size_t nfds,
> > -                                Error **errp);
> > +int __qio_channel_writev_full_all(QIOChannel *ioc,
> > +                                  const struct iovec *iov,
> > +                                  size_t niov,
> > +                                  int *fds, size_t nfds,
> > +                                  bool async, Error **errp);
> > +#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
> > +    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, false, errp)
> > +#define qio_channel_async_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
> > +    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, true, errp)
> > +
> > +/**
> > + * qio_channel_async_writev:
> > + * @ioc: the channel object
> > + * @iov: the array of memory regions to write data from
> > + * @niov: the length of the @iov array
> > + * @fds: an array of file handles to send
> > + * @nfds: number of file handles in @fds
> > + * @errp: pointer to a NULL-initialized error object
> > + *
> > + * Behaves like qio_channel_writev_full, but will send
> > + * data asynchronously, this meaning this function
> > + * may return before the data is actually sent.
> > + *
> > + * If at some point it's necessary wait for all data to be
> > + * sent, use qio_channel_async_flush().
> > + *
> > + * If not implemented, falls back to the default writev
> > + */
>
> I'm not convinced by the fallback here. If you're
> layering I/O channels this is not going to result in
> desirable behaviour.
>
> eg if QIOChannelTLS doesn't implement async, then when
> you call async_writev, it'lll invoke sync writev on
> the QIOChannelTLS, which will in turn invoke the sync
> writev on QIOChannelSocket, despite the latter having
> async writev support.  I think this is very misleading
> behaviour

Yeah, it's a good point.
Failing when async is not supported seems a better approach.

>
> > +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> > +                                 const struct iovec *iov,
> > +                                 size_t niov,
> > +                                 int *fds,
> > +                                 size_t nfds,
> > +                                 Error **errp);
>
> This is missing any flags. We need something like
>
>    QIO_CHANNEL_WRITE_FLAG_ZEROCOPY
>
> passed in an 'unsigned int flags' parameter. This in
> turn makes me question whether we should have the
> common helpers at all, as the api is going to be
> different for sync vs async.
>
> The QIOChannelFeature enum probably ought to be
> extended with QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY with
> support for probing whether that's supported or not.

Yeah, that makes sense to me.

>
> > +
> > +/**
> > + * qio_channel_async_flush:
> > + * @ioc: the channel object
> > + * @errp: pointer to a NULL-initialized error object
> > + *
> > + * Will lock until every packet queued with qio_channel_async_writev()
>
> s/lock/block/ I presume.

correct.

>
> > + * is sent.
> > + *
> > + * If not implemented, returns without changing anything.
> > + */
> > +
> > +void qio_channel_async_flush(QIOChannel *ioc,
> > +                             Error **errp);
> > +
> >
> >  #endif /* QIO_CHANNEL_H */
> > diff --git a/io/channel.c b/io/channel.c
> > index e8b019dc36..c4819b922f 100644
> > --- a/io/channel.c
> > +++ b/io/channel.c
> > @@ -67,12 +67,13 @@ ssize_t qio_channel_readv_full(QIOChannel *ioc,
> >  }
> >
> >
> > -ssize_t qio_channel_writev_full(QIOChannel *ioc,
> > -                                const struct iovec *iov,
> > -                                size_t niov,
> > -                                int *fds,
> > -                                size_t nfds,
> > -                                Error **errp)
> > +ssize_t __qio_channel_writev_full(QIOChannel *ioc,
> > +                                  const struct iovec *iov,
> > +                                  size_t niov,
> > +                                  int *fds,
> > +                                  size_t nfds,
> > +                                  bool async,
> > +                                  Error **errp)
> >  {
> >      QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> >
> > @@ -83,6 +84,10 @@ ssize_t qio_channel_writev_full(QIOChannel *ioc,
> >          return -1;
> >      }
> >
> > +    if (async && klass->io_async_writev) {
> > +        return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
> > +    }
> > +
> >      return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
> >  }
> >
> > @@ -212,19 +217,20 @@ int qio_channel_readv_full_all(QIOChannel *ioc,
> >      return ret;
> >  }
> >
> > -int qio_channel_writev_all(QIOChannel *ioc,
> > -                           const struct iovec *iov,
> > -                           size_t niov,
> > -                           Error **errp)
> > +int __qio_channel_writev_all(QIOChannel *ioc,
> > +                             const struct iovec *iov,
> > +                             size_t niov,
> > +                             bool async,
> > +                             Error **errp)
> >  {
> > -    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
> > +    return __qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, async, errp);
> >  }
> >
> > -int qio_channel_writev_full_all(QIOChannel *ioc,
> > +int __qio_channel_writev_full_all(QIOChannel *ioc,
> >                                  const struct iovec *iov,
> >                                  size_t niov,
> >                                  int *fds, size_t nfds,
> > -                                Error **errp)
> > +                                bool async, Error **errp)
> >  {
> >      int ret = -1;
> >      struct iovec *local_iov = g_new(struct iovec, niov);
> > @@ -237,8 +243,8 @@ int qio_channel_writev_full_all(QIOChannel *ioc,
> >
> >      while (nlocal_iov > 0) {
> >          ssize_t len;
> > -        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
> > -                                      errp);
> > +        len = __qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
> > +                                        async, errp);
> >          if (len == QIO_CHANNEL_ERR_BLOCK) {
> >              if (qemu_in_coroutine()) {
> >                  qio_channel_yield(ioc, G_IO_OUT);
> > @@ -474,6 +480,36 @@ off_t qio_channel_io_seek(QIOChannel *ioc,
> >  }
> >
> >
> > +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> > +                                 const struct iovec *iov,
> > +                                 size_t niov,
> > +                                 int *fds,
> > +                                 size_t nfds,
> > +                                 Error **errp)
> > +{
> > +     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> > +
> > +    if (!klass->io_async_writev) {
> > +        return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
> > +    }
> > +
> > +     return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
> > +}
> > +
> > +
> > +void qio_channel_async_flush(QIOChannel *ioc,
> > +                             Error **errp)
> > +{
> > +     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
> > +
> > +    if (!klass->io_async_flush) {
> > +        return;
> > +    }
> > +
> > +     klass->io_async_flush(ioc, errp);
> > +}
> > +
> > +
> >  static void qio_channel_restart_read(void *opaque)
> >  {
> >      QIOChannel *ioc = opaque;
> > --
> > 2.33.0
> >
>
> Regards,
> Daniel
> --
> |: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
> |: https://libvirt.org         -o-            https://fstop138.berrange.com :|
> |: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|
>
Leonardo Bras Sept. 29, 2021, 7:06 p.m. UTC | #4
Hello Peter, thanks for reviewing!

On Tue, Sep 28, 2021 at 6:52 PM Peter Xu <peterx@redhat.com> wrote:
>
> On Fri, Sep 24, 2021 at 06:16:04PM +0100, Daniel P. Berrangé wrote:
> > > +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> > > +                                 const struct iovec *iov,
> > > +                                 size_t niov,
> > > +                                 int *fds,
> > > +                                 size_t nfds,
> > > +                                 Error **errp);
> >
> > This is missing any flags. We need something like
> >
> >    QIO_CHANNEL_WRITE_FLAG_ZEROCOPY
> >
> > passed in an 'unsigned int flags' parameter. This in
> > turn makes me question whether we should have the
> > common helpers at all, as the api is going to be
> > different for sync vs async.
> >
> > The QIOChannelFeature enum probably ought to be
> > extended with QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY with
> > support for probing whether that's supported or not.
>
> I'm also wondering whether we could just drop the fds/nfds as per my knowledge
> SCM_RIGHT is the only user, at the meantime I don't see why an async interface
> would pass in any fd anyways..  Thanks,

FWIW, I think it's a great idea.
Daniel, what do you think?

>
> --
> Peter Xu
>

Best regards,
Leonardo
Daniel P. Berrangé Sept. 30, 2021, 8:34 a.m. UTC | #5
On Wed, Sep 29, 2021 at 04:06:33PM -0300, Leonardo Bras Soares Passos wrote:
> Hello Peter, thanks for reviewing!
> 
> On Tue, Sep 28, 2021 at 6:52 PM Peter Xu <peterx@redhat.com> wrote:
> >
> > On Fri, Sep 24, 2021 at 06:16:04PM +0100, Daniel P. Berrangé wrote:
> > > > +ssize_t qio_channel_async_writev(QIOChannel *ioc,
> > > > +                                 const struct iovec *iov,
> > > > +                                 size_t niov,
> > > > +                                 int *fds,
> > > > +                                 size_t nfds,
> > > > +                                 Error **errp);
> > >
> > > This is missing any flags. We need something like
> > >
> > >    QIO_CHANNEL_WRITE_FLAG_ZEROCOPY
> > >
> > > passed in an 'unsigned int flags' parameter. This in
> > > turn makes me question whether we should have the
> > > common helpers at all, as the api is going to be
> > > different for sync vs async.
> > >
> > > The QIOChannelFeature enum probably ought to be
> > > extended with QIO_CHANNEL_FEATURE_WRITE_ZEROCOPY with
> > > support for probing whether that's supported or not.
> >
> > I'm also wondering whether we could just drop the fds/nfds as per my knowledge
> > SCM_RIGHT is the only user, at the meantime I don't see why an async interface
> > would pass in any fd anyways..  Thanks,
> 
> FWIW, I think it's a great idea.
> Daniel, what do you think?

Yes, FD passing is not compatible with async operations, becuase it is
too complex to deal with FD lifetime on failure to send IO


Regards,
Daniel
diff mbox series

Patch

diff --git a/include/io/channel.h b/include/io/channel.h
index 88988979f8..74f2e3ae8a 100644
--- a/include/io/channel.h
+++ b/include/io/channel.h
@@ -136,6 +136,14 @@  struct QIOChannelClass {
                                   IOHandler *io_read,
                                   IOHandler *io_write,
                                   void *opaque);
+    ssize_t (*io_async_writev)(QIOChannel *ioc,
+                               const struct iovec *iov,
+                               size_t niov,
+                               int *fds,
+                               size_t nfds,
+                               Error **errp);
+   void (*io_async_flush)(QIOChannel *ioc,
+                          Error **errp);
 };
 
 /* General I/O handling functions */
@@ -255,12 +263,17 @@  ssize_t qio_channel_readv_full(QIOChannel *ioc,
  * or QIO_CHANNEL_ERR_BLOCK if no data is can be sent
  * and the channel is non-blocking
  */
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds,
-                                size_t nfds,
-                                Error **errp);
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds,
+                                  size_t nfds,
+                                  bool async,
+                                  Error **errp);
+#define qio_channel_writev_full(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full(ioc, iov, niov, fds, nfds, true, errp)
 
 /**
  * qio_channel_readv_all_eof:
@@ -339,10 +352,15 @@  int qio_channel_readv_all(QIOChannel *ioc,
  *
  * Returns: 0 if all bytes were written, or -1 on error
  */
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **erp);
+int __qio_channel_writev_all(QIOChannel *ioc,
+                             const struct iovec *iov,
+                             size_t niov,
+                             bool async,
+                             Error **erp);
+#define qio_channel_writev_all(ioc, iov, niov, erp) \
+    __qio_channel_writev_all(ioc, iov, niov, false, erp)
+#define qio_channel_async_writev_all(ioc, iov, niov, erp) \
+    __qio_channel_writev_all(ioc, iov, niov, true, erp)
 
 /**
  * qio_channel_readv:
@@ -849,10 +867,55 @@  int qio_channel_readv_full_all(QIOChannel *ioc,
  * Returns: 0 if all bytes were written, or -1 on error
  */
 
-int qio_channel_writev_full_all(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds, size_t nfds,
-                                Error **errp);
+int __qio_channel_writev_full_all(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds, size_t nfds,
+                                  bool async, Error **errp);
+#define qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, false, errp)
+#define qio_channel_async_writev_full_all(ioc, iov, niov, fds, nfds, errp) \
+    __qio_channel_writev_full_all(ioc, iov, niov, fds, nfds, true, errp)
+
+/**
+ * qio_channel_async_writev:
+ * @ioc: the channel object
+ * @iov: the array of memory regions to write data from
+ * @niov: the length of the @iov array
+ * @fds: an array of file handles to send
+ * @nfds: number of file handles in @fds
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Behaves like qio_channel_writev_full, but will send
+ * data asynchronously, this meaning this function
+ * may return before the data is actually sent.
+ *
+ * If at some point it's necessary wait for all data to be
+ * sent, use qio_channel_async_flush().
+ *
+ * If not implemented, falls back to the default writev
+ */
+
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+                                 const struct iovec *iov,
+                                 size_t niov,
+                                 int *fds,
+                                 size_t nfds,
+                                 Error **errp);
+
+/**
+ * qio_channel_async_flush:
+ * @ioc: the channel object
+ * @errp: pointer to a NULL-initialized error object
+ *
+ * Will lock until every packet queued with qio_channel_async_writev()
+ * is sent.
+ *
+ * If not implemented, returns without changing anything.
+ */
+
+void qio_channel_async_flush(QIOChannel *ioc,
+                             Error **errp);
+
 
 #endif /* QIO_CHANNEL_H */
diff --git a/io/channel.c b/io/channel.c
index e8b019dc36..c4819b922f 100644
--- a/io/channel.c
+++ b/io/channel.c
@@ -67,12 +67,13 @@  ssize_t qio_channel_readv_full(QIOChannel *ioc,
 }
 
 
-ssize_t qio_channel_writev_full(QIOChannel *ioc,
-                                const struct iovec *iov,
-                                size_t niov,
-                                int *fds,
-                                size_t nfds,
-                                Error **errp)
+ssize_t __qio_channel_writev_full(QIOChannel *ioc,
+                                  const struct iovec *iov,
+                                  size_t niov,
+                                  int *fds,
+                                  size_t nfds,
+                                  bool async,
+                                  Error **errp)
 {
     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
 
@@ -83,6 +84,10 @@  ssize_t qio_channel_writev_full(QIOChannel *ioc,
         return -1;
     }
 
+    if (async && klass->io_async_writev) {
+        return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+    }
+
     return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
 }
 
@@ -212,19 +217,20 @@  int qio_channel_readv_full_all(QIOChannel *ioc,
     return ret;
 }
 
-int qio_channel_writev_all(QIOChannel *ioc,
-                           const struct iovec *iov,
-                           size_t niov,
-                           Error **errp)
+int __qio_channel_writev_all(QIOChannel *ioc,
+                             const struct iovec *iov,
+                             size_t niov,
+                             bool async,
+                             Error **errp)
 {
-    return qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, errp);
+    return __qio_channel_writev_full_all(ioc, iov, niov, NULL, 0, async, errp);
 }
 
-int qio_channel_writev_full_all(QIOChannel *ioc,
+int __qio_channel_writev_full_all(QIOChannel *ioc,
                                 const struct iovec *iov,
                                 size_t niov,
                                 int *fds, size_t nfds,
-                                Error **errp)
+                                bool async, Error **errp)
 {
     int ret = -1;
     struct iovec *local_iov = g_new(struct iovec, niov);
@@ -237,8 +243,8 @@  int qio_channel_writev_full_all(QIOChannel *ioc,
 
     while (nlocal_iov > 0) {
         ssize_t len;
-        len = qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
-                                      errp);
+        len = __qio_channel_writev_full(ioc, local_iov, nlocal_iov, fds, nfds,
+                                        async, errp);
         if (len == QIO_CHANNEL_ERR_BLOCK) {
             if (qemu_in_coroutine()) {
                 qio_channel_yield(ioc, G_IO_OUT);
@@ -474,6 +480,36 @@  off_t qio_channel_io_seek(QIOChannel *ioc,
 }
 
 
+ssize_t qio_channel_async_writev(QIOChannel *ioc,
+                                 const struct iovec *iov,
+                                 size_t niov,
+                                 int *fds,
+                                 size_t nfds,
+                                 Error **errp)
+{
+     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+    if (!klass->io_async_writev) {
+        return klass->io_writev(ioc, iov, niov, fds, nfds, errp);
+    }
+
+     return klass->io_async_writev(ioc, iov, niov, fds, nfds, errp);
+}
+
+
+void qio_channel_async_flush(QIOChannel *ioc,
+                             Error **errp)
+{
+     QIOChannelClass *klass = QIO_CHANNEL_GET_CLASS(ioc);
+
+    if (!klass->io_async_flush) {
+        return;
+    }
+
+     klass->io_async_flush(ioc, errp);
+}
+
+
 static void qio_channel_restart_read(void *opaque)
 {
     QIOChannel *ioc = opaque;