diff mbox series

[3/6] io/channel-rdma: support working in coroutine

Message ID 1717503252-51884-4-git-send-email-arei.gonglei@huawei.com (mailing list archive)
State New, archived
Headers show
Series refactor RDMA live migration based on rsocket API | expand

Commit Message

Gonglei (Arei) June 4, 2024, 12:14 p.m. UTC
From: Jialin Wang <wangjialin23@huawei.com>

It is not feasible to obtain RDMA completion queue notifications
through poll/ppoll on the rsocket fd. Therefore, we create a thread
named rpoller for each rsocket fd and two eventfds: pollin_eventfd
and pollout_eventfd.

When using io_create_watch or io_set_aio_fd_handler waits for POLLIN
or POLLOUT events, it will actually poll/ppoll on the pollin_eventfd
and pollout_eventfd instead of the rsocket fd.

The rpoller rpoll() on the rsocket fd to receive POLLIN and POLLOUT
events.
When a POLLIN event occurs, the rpoller write the pollin_eventfd,
and then poll/ppoll will return the POLLIN event.
When a POLLOUT event occurs, the rpoller read the pollout_eventfd,
and then poll/ppoll will return the POLLOUT event.

For a non-blocking rsocket fd, if rread/rwrite returns EAGAIN, it will
read/write the pollin/pollout_eventfd, preventing poll/ppoll from
returning POLLIN/POLLOUT events.

Known limitations:

  For a blocking rsocket fd, if we use io_create_watch to wait for
  POLLIN or POLLOUT events, since the rsocket fd is blocking, we
  cannot determine when it is not ready to read/write as we can with
  non-blocking fds. Therefore, when an event occurs, it will occurs
  always, potentially leave the qemu hanging. So we need be cautious
  to avoid hanging when using io_create_watch .

Luckily, channel-rdma works well in coroutines :)

Signed-off-by: Jialin Wang <wangjialin23@huawei.com>
Signed-off-by: Gonglei <arei.gonglei@huawei.com>
---
 include/io/channel-rdma.h |  15 +-
 io/channel-rdma.c         | 363 +++++++++++++++++++++++++++++++++++++-
 2 files changed, 376 insertions(+), 2 deletions(-)

Comments

Haris Iqbal June 6, 2024, 1:34 p.m. UTC | #1
On Tue, Jun 4, 2024 at 2:14 PM Gonglei <arei.gonglei@huawei.com> wrote:
>
> From: Jialin Wang <wangjialin23@huawei.com>
>
> It is not feasible to obtain RDMA completion queue notifications
> through poll/ppoll on the rsocket fd. Therefore, we create a thread
> named rpoller for each rsocket fd and two eventfds: pollin_eventfd
> and pollout_eventfd.
>
> When using io_create_watch or io_set_aio_fd_handler waits for POLLIN
> or POLLOUT events, it will actually poll/ppoll on the pollin_eventfd
> and pollout_eventfd instead of the rsocket fd.
>
> The rpoller rpoll() on the rsocket fd to receive POLLIN and POLLOUT
> events.
> When a POLLIN event occurs, the rpoller write the pollin_eventfd,
> and then poll/ppoll will return the POLLIN event.
> When a POLLOUT event occurs, the rpoller read the pollout_eventfd,
> and then poll/ppoll will return the POLLOUT event.
>
> For a non-blocking rsocket fd, if rread/rwrite returns EAGAIN, it will
> read/write the pollin/pollout_eventfd, preventing poll/ppoll from
> returning POLLIN/POLLOUT events.
>
> Known limitations:
>
>   For a blocking rsocket fd, if we use io_create_watch to wait for
>   POLLIN or POLLOUT events, since the rsocket fd is blocking, we
>   cannot determine when it is not ready to read/write as we can with
>   non-blocking fds. Therefore, when an event occurs, it will occurs
>   always, potentially leave the qemu hanging. So we need be cautious
>   to avoid hanging when using io_create_watch .
>
> Luckily, channel-rdma works well in coroutines :)
>
> Signed-off-by: Jialin Wang <wangjialin23@huawei.com>
> Signed-off-by: Gonglei <arei.gonglei@huawei.com>
> ---
>  include/io/channel-rdma.h |  15 +-
>  io/channel-rdma.c         | 363 +++++++++++++++++++++++++++++++++++++-
>  2 files changed, 376 insertions(+), 2 deletions(-)
>
> diff --git a/include/io/channel-rdma.h b/include/io/channel-rdma.h
> index 8cab2459e5..cb56127d76 100644
> --- a/include/io/channel-rdma.h
> +++ b/include/io/channel-rdma.h
> @@ -47,6 +47,18 @@ struct QIOChannelRDMA {
>      socklen_t localAddrLen;
>      struct sockaddr_storage remoteAddr;
>      socklen_t remoteAddrLen;
> +
> +    /* private */
> +
> +    /* qemu g_poll/ppoll() POLLIN event on it */
> +    int pollin_eventfd;
> +    /* qemu g_poll/ppoll() POLLOUT event on it */
> +    int pollout_eventfd;
> +
> +    /* the index in the rpoller's fds array */
> +    int index;
> +    /* rpoller will rpoll() rpoll_events on the rsocket fd */
> +    short int rpoll_events;
>  };
>
>  /**
> @@ -147,6 +159,7 @@ void qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress *addr,
>   *
>   * Returns: the new client channel, or NULL on error
>   */
> -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *ioc, Error **errp);
> +QIOChannelRDMA *coroutine_mixed_fn qio_channel_rdma_accept(QIOChannelRDMA *ioc,
> +                                                           Error **errp);
>
>  #endif /* QIO_CHANNEL_RDMA_H */
> diff --git a/io/channel-rdma.c b/io/channel-rdma.c
> index 92c362df52..9792add5cf 100644
> --- a/io/channel-rdma.c
> +++ b/io/channel-rdma.c
> @@ -23,10 +23,15 @@
>
>  #include "qemu/osdep.h"
>  #include "io/channel-rdma.h"
> +#include "io/channel-util.h"
> +#include "io/channel-watch.h"
>  #include "io/channel.h"
>  #include "qapi/clone-visitor.h"
>  #include "qapi/error.h"
>  #include "qapi/qapi-visit-sockets.h"
> +#include "qemu/atomic.h"
> +#include "qemu/error-report.h"
> +#include "qemu/thread.h"
>  #include "trace.h"
>  #include <errno.h>
>  #include <netdb.h>
> @@ -39,11 +44,274 @@
>  #include <sys/poll.h>
>  #include <unistd.h>
>
> +typedef enum {
> +    CLEAR_POLLIN,
> +    CLEAR_POLLOUT,
> +    SET_POLLIN,
> +    SET_POLLOUT,
> +} UpdateEvent;
> +
> +typedef enum {
> +    RP_CMD_ADD_IOC,
> +    RP_CMD_DEL_IOC,
> +    RP_CMD_UPDATE,
> +} RpollerCMD;
> +
> +typedef struct {
> +    RpollerCMD cmd;
> +    QIOChannelRDMA *rioc;
> +} RpollerMsg;
> +
> +/*
> + * rpoll() on the rsocket fd with rpoll_events, when POLLIN/POLLOUT event
> + * occurs, it will write/read the pollin_eventfd/pollout_eventfd to allow
> + * qemu g_poll/ppoll() get the POLLIN/POLLOUT event
> + */
> +static struct Rpoller {
> +    QemuThread thread;
> +    bool is_running;
> +    int sock[2];
> +    int count; /* the number of rsocket fds being rpoll() */
> +    int size; /* the size of fds/riocs */
> +    struct pollfd *fds;
> +    QIOChannelRDMA **riocs;
> +} rpoller;
> +
> +static void qio_channel_rdma_notify_rpoller(QIOChannelRDMA *rioc,
> +                                            RpollerCMD cmd)
> +{
> +    RpollerMsg msg;
> +    int ret;
> +
> +    msg.cmd = cmd;
> +    msg.rioc = rioc;
> +
> +    ret = RETRY_ON_EINTR(write(rpoller.sock[0], &msg, sizeof msg));
> +    if (ret != sizeof msg) {
> +        error_report("%s: failed to send msg, errno: %d", __func__, errno);
> +    }
> +}
> +
> +static void qio_channel_rdma_update_poll_event(QIOChannelRDMA *rioc,
> +                                               UpdateEvent action,
> +                                               bool notify_rpoller)
> +{
> +    /* An eventfd with the value of ULLONG_MAX - 1 is readable but unwritable */
> +    unsigned long long buf = ULLONG_MAX - 1;
> +
> +    switch (action) {
> +    /* only rpoller do SET_* action, to allow qemu ppoll() get the event */
> +    case SET_POLLIN:
> +        RETRY_ON_EINTR(write(rioc->pollin_eventfd, &buf, sizeof buf));
> +        rioc->rpoll_events &= ~POLLIN;
> +        break;
> +    case SET_POLLOUT:
> +        RETRY_ON_EINTR(read(rioc->pollout_eventfd, &buf, sizeof buf));
> +        rioc->rpoll_events &= ~POLLOUT;
> +        break;
> +
> +    /* the rsocket fd is not ready to rread/rwrite */
> +    case CLEAR_POLLIN:
> +        RETRY_ON_EINTR(read(rioc->pollin_eventfd, &buf, sizeof buf));
> +        rioc->rpoll_events |= POLLIN;
> +        break;
> +    case CLEAR_POLLOUT:
> +        RETRY_ON_EINTR(write(rioc->pollout_eventfd, &buf, sizeof buf));
> +        rioc->rpoll_events |= POLLOUT;
> +        break;
> +    default:
> +        break;
> +    }
> +
> +    /* notify rpoller to rpoll() POLLIN/POLLOUT events */
> +    if (notify_rpoller) {
> +        qio_channel_rdma_notify_rpoller(rioc, RP_CMD_UPDATE);
> +    }
> +}
> +
> +static void qio_channel_rdma_rpoller_add_rioc(QIOChannelRDMA *rioc)
> +{
> +    if (rioc->index != -1) {
> +        error_report("%s: rioc already exsits", __func__);
> +        return;
> +    }
> +
> +    rioc->index = ++rpoller.count;
> +
> +    if (rpoller.count + 1 > rpoller.size) {
> +        rpoller.size *= 2;
> +        rpoller.fds = g_renew(struct pollfd, rpoller.fds, rpoller.size);
> +        rpoller.riocs = g_renew(QIOChannelRDMA *, rpoller.riocs, rpoller.size);
> +    }
> +
> +    rpoller.fds[rioc->index].fd = rioc->fd;
> +    rpoller.fds[rioc->index].events = rioc->rpoll_events;

The allotment of rioc fds and events to rpoller slots are sequential,
but making the deletion also sequentials means that the del_rioc needs
to be called in the exact opposite sequence as they were added
(through add_rioc). Otherwise we leaves holes in between, and
readditions might step on an already used slot.

Does this setup make sure that the above restriction is satisfied, or
am I missing something?

> +    rpoller.riocs[rioc->index] = rioc;
> +}
> +
> +static void qio_channel_rdma_rpoller_del_rioc(QIOChannelRDMA *rioc)
> +{
> +    if (rioc->index == -1) {
> +        error_report("%s: rioc not exsits", __func__);
> +        return;
> +    }
> +
> +    rpoller.fds[rioc->index] = rpoller.fds[rpoller.count];

Should this be rpoller.count-1?

> +    rpoller.riocs[rioc->index] = rpoller.riocs[rpoller.count];
> +    rpoller.riocs[rioc->index]->index = rioc->index;
> +    rpoller.count--;
> +
> +    close(rioc->pollin_eventfd);
> +    close(rioc->pollout_eventfd);
> +    rioc->index = -1;
> +    rioc->rpoll_events = 0;
> +}
> +
> +static void qio_channel_rdma_rpoller_update_ioc(QIOChannelRDMA *rioc)
> +{
> +    if (rioc->index == -1) {
> +        error_report("%s: rioc not exsits", __func__);
> +        return;
> +    }
> +
> +    rpoller.fds[rioc->index].fd = rioc->fd;
> +    rpoller.fds[rioc->index].events = rioc->rpoll_events;
> +}
> +
> +static void qio_channel_rdma_rpoller_process_msg(void)
> +{
> +    RpollerMsg msg;
> +    int ret;
> +
> +    ret = RETRY_ON_EINTR(read(rpoller.sock[1], &msg, sizeof msg));
> +    if (ret != sizeof msg) {
> +        error_report("%s: rpoller failed to recv msg: %s", __func__,
> +                     strerror(errno));
> +        return;
> +    }
> +
> +    switch (msg.cmd) {
> +    case RP_CMD_ADD_IOC:
> +        qio_channel_rdma_rpoller_add_rioc(msg.rioc);
> +        break;
> +    case RP_CMD_DEL_IOC:
> +        qio_channel_rdma_rpoller_del_rioc(msg.rioc);
> +        break;
> +    case RP_CMD_UPDATE:
> +        qio_channel_rdma_rpoller_update_ioc(msg.rioc);
> +        break;
> +    default:
> +        break;
> +    }
> +}
> +
> +static void qio_channel_rdma_rpoller_cleanup(void)
> +{
> +    close(rpoller.sock[0]);
> +    close(rpoller.sock[1]);
> +    rpoller.sock[0] = -1;
> +    rpoller.sock[1] = -1;
> +    g_free(rpoller.fds);
> +    g_free(rpoller.riocs);
> +    rpoller.fds = NULL;
> +    rpoller.riocs = NULL;
> +    rpoller.count = 0;
> +    rpoller.size = 0;
> +    rpoller.is_running = false;
> +}
> +
> +static void *qio_channel_rdma_rpoller_thread(void *opaque)
> +{
> +    int i, ret, error_events = POLLERR | POLLHUP | POLLNVAL;
> +
> +    do {
> +        ret = rpoll(rpoller.fds, rpoller.count + 1, -1);
> +        if (ret < 0 && errno != -EINTR) {
> +            error_report("%s: rpoll() error: %s", __func__, strerror(errno));
> +            break;
> +        }
> +
> +        for (i = 1; i <= rpoller.count; i++) {
> +            if (rpoller.fds[i].revents & (POLLIN | error_events)) {
> +                qio_channel_rdma_update_poll_event(rpoller.riocs[i], SET_POLLIN,
> +                                                   false);
> +                rpoller.fds[i].events &= ~POLLIN;
> +            }
> +            if (rpoller.fds[i].revents & (POLLOUT | error_events)) {
> +                qio_channel_rdma_update_poll_event(rpoller.riocs[i],
> +                                                   SET_POLLOUT, false);
> +                rpoller.fds[i].events &= ~POLLOUT;
> +            }
> +            /* ignore this fd */
> +            if (rpoller.fds[i].revents & (error_events)) {
> +                rpoller.fds[i].fd = -1;
> +            }
> +        }
> +
> +        if (rpoller.fds[0].revents) {
> +            qio_channel_rdma_rpoller_process_msg();
> +        }
> +    } while (rpoller.count >= 1);
> +
> +    qio_channel_rdma_rpoller_cleanup();
> +
> +    return NULL;
> +}
> +
> +static void qio_channel_rdma_rpoller_start(void)
> +{
> +    if (qatomic_xchg(&rpoller.is_running, true)) {
> +        return;
> +    }
> +
> +    if (qemu_socketpair(AF_UNIX, SOCK_STREAM, 0, rpoller.sock)) {
> +        rpoller.is_running = false;
> +        error_report("%s: failed to create socketpair %s", __func__,
> +                     strerror(errno));
> +        return;
> +    }
> +
> +    rpoller.count = 0;
> +    rpoller.size = 4;
> +    rpoller.fds = g_malloc0_n(rpoller.size, sizeof(struct pollfd));
> +    rpoller.riocs = g_malloc0_n(rpoller.size, sizeof(QIOChannelRDMA *));
> +    rpoller.fds[0].fd = rpoller.sock[1];
> +    rpoller.fds[0].events = POLLIN;
> +
> +    qemu_thread_create(&rpoller.thread, "qio-channel-rdma-rpoller",
> +                       qio_channel_rdma_rpoller_thread, NULL,
> +                       QEMU_THREAD_JOINABLE);
> +}
> +
> +static void qio_channel_rdma_add_rioc_to_rpoller(QIOChannelRDMA *rioc)
> +{
> +    int flags = EFD_CLOEXEC | EFD_NONBLOCK;
> +
> +    /*
> +     * A single eventfd is either readable or writable. A single eventfd cannot
> +     * represent a state where it is neither readable nor writable. so use two
> +     * eventfds here.
> +     */
> +    rioc->pollin_eventfd = eventfd(0, flags);
> +    rioc->pollout_eventfd = eventfd(0, flags);
> +    /* pollout_eventfd with the value 0, means writable, make it unwritable */
> +    qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, false);
> +
> +    /* tell the rpoller to rpoll() events on rioc->socketfd */
> +    rioc->rpoll_events = POLLIN | POLLOUT;
> +    qio_channel_rdma_notify_rpoller(rioc, RP_CMD_ADD_IOC);
> +}
> +
>  QIOChannelRDMA *qio_channel_rdma_new(void)
>  {
>      QIOChannelRDMA *rioc;
>      QIOChannel *ioc;
>
> +    qio_channel_rdma_rpoller_start();
> +    if (!rpoller.is_running) {
> +        return NULL;
> +    }
> +
>      rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
>      ioc = QIO_CHANNEL(rioc);
>      qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
> @@ -125,6 +393,8 @@ retry:
>          goto out;
>      }
>
> +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> +
>  out:
>      if (ret) {
>          trace_qio_channel_rdma_connect_fail(rioc);
> @@ -211,6 +481,8 @@ int qio_channel_rdma_listen_sync(QIOChannelRDMA *rioc, InetSocketAddress *addr,
>      qio_channel_set_feature(QIO_CHANNEL(rioc), QIO_CHANNEL_FEATURE_LISTEN);
>      trace_qio_channel_rdma_listen_complete(rioc, fd);
>
> +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> +
>  out:
>      if (ret) {
>          trace_qio_channel_rdma_listen_fail(rioc);
> @@ -267,8 +539,10 @@ void qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress *addr,
>                             qio_channel_listen_worker_free, context);
>  }
>
> -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *rioc, Error **errp)
> +QIOChannelRDMA *coroutine_mixed_fn qio_channel_rdma_accept(QIOChannelRDMA *rioc,
> +                                                           Error **errp)
>  {
> +    QIOChannel *ioc = QIO_CHANNEL(rioc);
>      QIOChannelRDMA *cioc;
>
>      cioc = qio_channel_rdma_new();
> @@ -283,6 +557,17 @@ retry:
>          if (errno == EINTR) {
>              goto retry;
>          }
> +        if (errno == EAGAIN) {
> +            if (!(rioc->rpoll_events & POLLIN)) {
> +                qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLIN, true);
> +            }
> +            if (qemu_in_coroutine()) {
> +                qio_channel_yield(ioc, G_IO_IN);
> +            } else {
> +                qio_channel_wait(ioc, G_IO_IN);
> +            }
> +            goto retry;
> +        }
>          error_setg_errno(errp, errno, "Unable to accept connection");
>          goto error;
>      }
> @@ -294,6 +579,8 @@ retry:
>          goto error;
>      }
>
> +    qio_channel_rdma_add_rioc_to_rpoller(cioc);
> +
>      trace_qio_channel_rdma_accept_complete(rioc, cioc, cioc->fd);
>      return cioc;
>
> @@ -307,6 +594,10 @@ static void qio_channel_rdma_init(Object *obj)
>  {
>      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
>      ioc->fd = -1;
> +    ioc->pollin_eventfd = -1;
> +    ioc->pollout_eventfd = -1;
> +    ioc->index = -1;
> +    ioc->rpoll_events = 0;
>  }
>
>  static void qio_channel_rdma_finalize(Object *obj)
> @@ -314,6 +605,7 @@ static void qio_channel_rdma_finalize(Object *obj)
>      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
>
>      if (ioc->fd != -1) {
> +        qio_channel_rdma_notify_rpoller(ioc, RP_CMD_DEL_IOC);
>          rclose(ioc->fd);
>          ioc->fd = -1;
>      }
> @@ -330,6 +622,12 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, const struct iovec *iov,
>  retry:
>      ret = rreadv(rioc->fd, iov, niov);
>      if (ret < 0) {
> +        if (errno == EAGAIN) {
> +            if (!(rioc->rpoll_events & POLLIN)) {
> +                qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLIN, true);
> +            }
> +            return QIO_CHANNEL_ERR_BLOCK;
> +        }
>          if (errno == EINTR) {
>              goto retry;
>          }
> @@ -351,6 +649,12 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, const struct iovec *iov,
>  retry:
>      ret = rwritev(rioc->fd, iov, niov);
>      if (ret <= 0) {
> +        if (errno == EAGAIN) {
> +            if (!(rioc->rpoll_events & POLLOUT)) {
> +                qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, true);
> +            }
> +            return QIO_CHANNEL_ERR_BLOCK;
> +        }
>          if (errno == EINTR) {
>              goto retry;
>          }
> @@ -361,6 +665,28 @@ retry:
>      return ret;
>  }
>
> +static int qio_channel_rdma_set_blocking(QIOChannel *ioc, bool enabled,
> +                                         Error **errp G_GNUC_UNUSED)
> +{
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> +    int flags, ret;
> +
> +    flags = rfcntl(rioc->fd, F_GETFL);
> +    if (enabled) {
> +        flags &= ~O_NONBLOCK;
> +    } else {
> +        flags |= O_NONBLOCK;
> +    }
> +
> +    ret = rfcntl(rioc->fd, F_SETFL, flags);
> +    if (ret) {
> +        error_setg_errno(errp, errno,
> +                         "Unable to rfcntl rsocket fd with flags %d", flags);
> +    }
> +
> +    return ret;
> +}
> +
>  static void qio_channel_rdma_set_delay(QIOChannel *ioc, bool enabled)
>  {
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> @@ -374,6 +700,7 @@ static int qio_channel_rdma_close(QIOChannel *ioc, Error **errp)
>      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
>
>      if (rioc->fd != -1) {
> +        qio_channel_rdma_notify_rpoller(rioc, RP_CMD_DEL_IOC);
>          rclose(rioc->fd);
>          rioc->fd = -1;
>      }
> @@ -408,6 +735,37 @@ static int qio_channel_rdma_shutdown(QIOChannel *ioc, QIOChannelShutdown how,
>      return 0;
>  }
>
> +static void
> +qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, AioContext *read_ctx,
> +                                    IOHandler *io_read, AioContext *write_ctx,
> +                                    IOHandler *io_write, void *opaque)
> +{
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> +
> +    qio_channel_util_set_aio_fd_handler(rioc->pollin_eventfd, read_ctx, io_read,
> +                                        rioc->pollout_eventfd, write_ctx,
> +                                        io_write, opaque);
> +}
> +
> +static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
> +                                              GIOCondition condition)
> +{
> +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> +
> +    switch (condition) {
> +    case G_IO_IN:
> +        return qio_channel_create_fd_watch(ioc, rioc->pollin_eventfd,
> +                                           condition);
> +    case G_IO_OUT:
> +        return qio_channel_create_fd_watch(ioc, rioc->pollout_eventfd,
> +                                           condition);
> +    default:
> +        error_report("%s: do not support watch 0x%x event", __func__,
> +                     condition);
> +        return NULL;
> +    }
> +}
> +
>  static void qio_channel_rdma_class_init(ObjectClass *klass,
>                                          void *class_data G_GNUC_UNUSED)
>  {
> @@ -415,9 +773,12 @@ static void qio_channel_rdma_class_init(ObjectClass *klass,
>
>      ioc_klass->io_writev = qio_channel_rdma_writev;
>      ioc_klass->io_readv = qio_channel_rdma_readv;
> +    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
>      ioc_klass->io_close = qio_channel_rdma_close;
>      ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
>      ioc_klass->io_set_delay = qio_channel_rdma_set_delay;
> +    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
> +    ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
>  }
>
>  static const TypeInfo qio_channel_rdma_info = {
> --
> 2.43.0
>
>
Gao,Shiyuan" via June 7, 2024, 8:45 a.m. UTC | #2
> -----Original Message-----
> From: Haris Iqbal [mailto:haris.iqbal@ionos.com]
> Sent: Thursday, June 6, 2024 9:35 PM
> To: Gonglei (Arei) <arei.gonglei@huawei.com>
> Cc: qemu-devel@nongnu.org; peterx@redhat.com; yu.zhang@ionos.com;
> mgalaxy@akamai.com; elmar.gerdes@ionos.com; zhengchuan
> <zhengchuan@huawei.com>; berrange@redhat.com; armbru@redhat.com;
> lizhijian@fujitsu.com; pbonzini@redhat.com; mst@redhat.com; Xiexiangyou
> <xiexiangyou@huawei.com>; linux-rdma@vger.kernel.org; lixiao (H)
> <lixiao91@huawei.com>; jinpu.wang@ionos.com; Wangjialin
> <wangjialin23@huawei.com>
> Subject: Re: [PATCH 3/6] io/channel-rdma: support working in coroutine
> 
> On Tue, Jun 4, 2024 at 2:14 PM Gonglei <arei.gonglei@huawei.com> wrote:
> >
> > From: Jialin Wang <wangjialin23@huawei.com>
> >
> > It is not feasible to obtain RDMA completion queue notifications
> > through poll/ppoll on the rsocket fd. Therefore, we create a thread
> > named rpoller for each rsocket fd and two eventfds: pollin_eventfd and
> > pollout_eventfd.
> >
> > When using io_create_watch or io_set_aio_fd_handler waits for POLLIN
> > or POLLOUT events, it will actually poll/ppoll on the pollin_eventfd
> > and pollout_eventfd instead of the rsocket fd.
> >
> > The rpoller rpoll() on the rsocket fd to receive POLLIN and POLLOUT
> > events.
> > When a POLLIN event occurs, the rpoller write the pollin_eventfd, and
> > then poll/ppoll will return the POLLIN event.
> > When a POLLOUT event occurs, the rpoller read the pollout_eventfd, and
> > then poll/ppoll will return the POLLOUT event.
> >
> > For a non-blocking rsocket fd, if rread/rwrite returns EAGAIN, it will
> > read/write the pollin/pollout_eventfd, preventing poll/ppoll from
> > returning POLLIN/POLLOUT events.
> >
> > Known limitations:
> >
> >   For a blocking rsocket fd, if we use io_create_watch to wait for
> >   POLLIN or POLLOUT events, since the rsocket fd is blocking, we
> >   cannot determine when it is not ready to read/write as we can with
> >   non-blocking fds. Therefore, when an event occurs, it will occurs
> >   always, potentially leave the qemu hanging. So we need be cautious
> >   to avoid hanging when using io_create_watch .
> >
> > Luckily, channel-rdma works well in coroutines :)
> >
> > Signed-off-by: Jialin Wang <wangjialin23@huawei.com>
> > Signed-off-by: Gonglei <arei.gonglei@huawei.com>
> > ---
> >  include/io/channel-rdma.h |  15 +-
> >  io/channel-rdma.c         | 363
> +++++++++++++++++++++++++++++++++++++-
> >  2 files changed, 376 insertions(+), 2 deletions(-)
> >
> > diff --git a/include/io/channel-rdma.h b/include/io/channel-rdma.h
> > index 8cab2459e5..cb56127d76 100644
> > --- a/include/io/channel-rdma.h
> > +++ b/include/io/channel-rdma.h
> > @@ -47,6 +47,18 @@ struct QIOChannelRDMA {
> >      socklen_t localAddrLen;
> >      struct sockaddr_storage remoteAddr;
> >      socklen_t remoteAddrLen;
> > +
> > +    /* private */
> > +
> > +    /* qemu g_poll/ppoll() POLLIN event on it */
> > +    int pollin_eventfd;
> > +    /* qemu g_poll/ppoll() POLLOUT event on it */
> > +    int pollout_eventfd;
> > +
> > +    /* the index in the rpoller's fds array */
> > +    int index;
> > +    /* rpoller will rpoll() rpoll_events on the rsocket fd */
> > +    short int rpoll_events;
> >  };
> >
> >  /**
> > @@ -147,6 +159,7 @@ void
> qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress
> *addr,
> >   *
> >   * Returns: the new client channel, or NULL on error
> >   */
> > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *ioc,
> Error
> > **errp);
> > +QIOChannelRDMA *coroutine_mixed_fn
> qio_channel_rdma_accept(QIOChannelRDMA *ioc,
> > +
> Error
> > +**errp);
> >
> >  #endif /* QIO_CHANNEL_RDMA_H */
> > diff --git a/io/channel-rdma.c b/io/channel-rdma.c index
> > 92c362df52..9792add5cf 100644
> > --- a/io/channel-rdma.c
> > +++ b/io/channel-rdma.c
> > @@ -23,10 +23,15 @@
> >
> >  #include "qemu/osdep.h"
> >  #include "io/channel-rdma.h"
> > +#include "io/channel-util.h"
> > +#include "io/channel-watch.h"
> >  #include "io/channel.h"
> >  #include "qapi/clone-visitor.h"
> >  #include "qapi/error.h"
> >  #include "qapi/qapi-visit-sockets.h"
> > +#include "qemu/atomic.h"
> > +#include "qemu/error-report.h"
> > +#include "qemu/thread.h"
> >  #include "trace.h"
> >  #include <errno.h>
> >  #include <netdb.h>
> > @@ -39,11 +44,274 @@
> >  #include <sys/poll.h>
> >  #include <unistd.h>
> >
> > +typedef enum {
> > +    CLEAR_POLLIN,
> > +    CLEAR_POLLOUT,
> > +    SET_POLLIN,
> > +    SET_POLLOUT,
> > +} UpdateEvent;
> > +
> > +typedef enum {
> > +    RP_CMD_ADD_IOC,
> > +    RP_CMD_DEL_IOC,
> > +    RP_CMD_UPDATE,
> > +} RpollerCMD;
> > +
> > +typedef struct {
> > +    RpollerCMD cmd;
> > +    QIOChannelRDMA *rioc;
> > +} RpollerMsg;
> > +
> > +/*
> > + * rpoll() on the rsocket fd with rpoll_events, when POLLIN/POLLOUT
> > +event
> > + * occurs, it will write/read the pollin_eventfd/pollout_eventfd to
> > +allow
> > + * qemu g_poll/ppoll() get the POLLIN/POLLOUT event  */ static struct
> > +Rpoller {
> > +    QemuThread thread;
> > +    bool is_running;
> > +    int sock[2];
> > +    int count; /* the number of rsocket fds being rpoll() */
> > +    int size; /* the size of fds/riocs */
> > +    struct pollfd *fds;
> > +    QIOChannelRDMA **riocs;
> > +} rpoller;
> > +
> > +static void qio_channel_rdma_notify_rpoller(QIOChannelRDMA *rioc,
> > +                                            RpollerCMD cmd) {
> > +    RpollerMsg msg;
> > +    int ret;
> > +
> > +    msg.cmd = cmd;
> > +    msg.rioc = rioc;
> > +
> > +    ret = RETRY_ON_EINTR(write(rpoller.sock[0], &msg, sizeof msg));
> > +    if (ret != sizeof msg) {
> > +        error_report("%s: failed to send msg, errno: %d", __func__,
> errno);
> > +    }
> > +}
> > +
> > +static void qio_channel_rdma_update_poll_event(QIOChannelRDMA *rioc,
> > +                                               UpdateEvent
> action,
> > +                                               bool notify_rpoller)
> {
> > +    /* An eventfd with the value of ULLONG_MAX - 1 is readable but
> unwritable */
> > +    unsigned long long buf = ULLONG_MAX - 1;
> > +
> > +    switch (action) {
> > +    /* only rpoller do SET_* action, to allow qemu ppoll() get the event */
> > +    case SET_POLLIN:
> > +        RETRY_ON_EINTR(write(rioc->pollin_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events &= ~POLLIN;
> > +        break;
> > +    case SET_POLLOUT:
> > +        RETRY_ON_EINTR(read(rioc->pollout_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events &= ~POLLOUT;
> > +        break;
> > +
> > +    /* the rsocket fd is not ready to rread/rwrite */
> > +    case CLEAR_POLLIN:
> > +        RETRY_ON_EINTR(read(rioc->pollin_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events |= POLLIN;
> > +        break;
> > +    case CLEAR_POLLOUT:
> > +        RETRY_ON_EINTR(write(rioc->pollout_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events |= POLLOUT;
> > +        break;
> > +    default:
> > +        break;
> > +    }
> > +
> > +    /* notify rpoller to rpoll() POLLIN/POLLOUT events */
> > +    if (notify_rpoller) {
> > +        qio_channel_rdma_notify_rpoller(rioc, RP_CMD_UPDATE);
> > +    }
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_add_rioc(QIOChannelRDMA *rioc) {
> > +    if (rioc->index != -1) {
> > +        error_report("%s: rioc already exsits", __func__);
> > +        return;
> > +    }
> > +
> > +    rioc->index = ++rpoller.count;
> > +
> > +    if (rpoller.count + 1 > rpoller.size) {
> > +        rpoller.size *= 2;
> > +        rpoller.fds = g_renew(struct pollfd, rpoller.fds, rpoller.size);
> > +        rpoller.riocs = g_renew(QIOChannelRDMA *, rpoller.riocs,
> rpoller.size);
> > +    }
> > +
> > +    rpoller.fds[rioc->index].fd = rioc->fd;
> > +    rpoller.fds[rioc->index].events = rioc->rpoll_events;
> 
> The allotment of rioc fds and events to rpoller slots are sequential, but making
> the deletion also sequentials means that the del_rioc needs to be called in the
> exact opposite sequence as they were added (through add_rioc). Otherwise we
> leaves holes in between, and readditions might step on an already used slot.
> 
> Does this setup make sure that the above restriction is satisfied, or am I
> missing something?
> 

Actually, we use an O (1) algorithm for deletion, that is, each time we replace the array element to be deleted with the last one.
Pls see qio_channel_rdma_rpoller_del_rioc():

   rpoller.fds[rioc->index] = rpoller.fds[rpoller.count];

> > +    rpoller.riocs[rioc->index] = rioc; }
> > +
> > +static void qio_channel_rdma_rpoller_del_rioc(QIOChannelRDMA *rioc) {
> > +    if (rioc->index == -1) {
> > +        error_report("%s: rioc not exsits", __func__);
> > +        return;
> > +    }
> > +
> > +    rpoller.fds[rioc->index] = rpoller.fds[rpoller.count];
> 
> Should this be rpoller.count-1?
> 
No. the first element is the sockpairs' fd. Pls see qio_channel_rdma_rpoller_start():

   rpoller.fds[0].fd = rpoller.sock[1];
   rpoller.fds[0].events = POLLIN;


Regards,
-Gonglei

> > +    rpoller.riocs[rioc->index] = rpoller.riocs[rpoller.count];
> > +    rpoller.riocs[rioc->index]->index = rioc->index;
> > +    rpoller.count--;
> > +
> > +    close(rioc->pollin_eventfd);
> > +    close(rioc->pollout_eventfd);
> > +    rioc->index = -1;
> > +    rioc->rpoll_events = 0;
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_update_ioc(QIOChannelRDMA *rioc)
> > +{
> > +    if (rioc->index == -1) {
> > +        error_report("%s: rioc not exsits", __func__);
> > +        return;
> > +    }
> > +
> > +    rpoller.fds[rioc->index].fd = rioc->fd;
> > +    rpoller.fds[rioc->index].events = rioc->rpoll_events; }
> > +
> > +static void qio_channel_rdma_rpoller_process_msg(void)
> > +{
> > +    RpollerMsg msg;
> > +    int ret;
> > +
> > +    ret = RETRY_ON_EINTR(read(rpoller.sock[1], &msg, sizeof msg));
> > +    if (ret != sizeof msg) {
> > +        error_report("%s: rpoller failed to recv msg: %s", __func__,
> > +                     strerror(errno));
> > +        return;
> > +    }
> > +
> > +    switch (msg.cmd) {
> > +    case RP_CMD_ADD_IOC:
> > +        qio_channel_rdma_rpoller_add_rioc(msg.rioc);
> > +        break;
> > +    case RP_CMD_DEL_IOC:
> > +        qio_channel_rdma_rpoller_del_rioc(msg.rioc);
> > +        break;
> > +    case RP_CMD_UPDATE:
> > +        qio_channel_rdma_rpoller_update_ioc(msg.rioc);
> > +        break;
> > +    default:
> > +        break;
> > +    }
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_cleanup(void)
> > +{
> > +    close(rpoller.sock[0]);
> > +    close(rpoller.sock[1]);
> > +    rpoller.sock[0] = -1;
> > +    rpoller.sock[1] = -1;
> > +    g_free(rpoller.fds);
> > +    g_free(rpoller.riocs);
> > +    rpoller.fds = NULL;
> > +    rpoller.riocs = NULL;
> > +    rpoller.count = 0;
> > +    rpoller.size = 0;
> > +    rpoller.is_running = false;
> > +}
> > +
> > +static void *qio_channel_rdma_rpoller_thread(void *opaque) {
> > +    int i, ret, error_events = POLLERR | POLLHUP | POLLNVAL;
> > +
> > +    do {
> > +        ret = rpoll(rpoller.fds, rpoller.count + 1, -1);
> > +        if (ret < 0 && errno != -EINTR) {
> > +            error_report("%s: rpoll() error: %s", __func__,
> strerror(errno));
> > +            break;
> > +        }
> > +
> > +        for (i = 1; i <= rpoller.count; i++) {
> > +            if (rpoller.fds[i].revents & (POLLIN | error_events)) {
> > +                qio_channel_rdma_update_poll_event(rpoller.riocs[i],
> SET_POLLIN,
> > +                                                   false);
> > +                rpoller.fds[i].events &= ~POLLIN;
> > +            }
> > +            if (rpoller.fds[i].revents & (POLLOUT | error_events)) {
> > +                qio_channel_rdma_update_poll_event(rpoller.riocs[i],
> > +
> SET_POLLOUT, false);
> > +                rpoller.fds[i].events &= ~POLLOUT;
> > +            }
> > +            /* ignore this fd */
> > +            if (rpoller.fds[i].revents & (error_events)) {
> > +                rpoller.fds[i].fd = -1;
> > +            }
> > +        }
> > +
> > +        if (rpoller.fds[0].revents) {
> > +            qio_channel_rdma_rpoller_process_msg();
> > +        }
> > +    } while (rpoller.count >= 1);
> > +
> > +    qio_channel_rdma_rpoller_cleanup();
> > +
> > +    return NULL;
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_start(void)
> > +{
> > +    if (qatomic_xchg(&rpoller.is_running, true)) {
> > +        return;
> > +    }
> > +
> > +    if (qemu_socketpair(AF_UNIX, SOCK_STREAM, 0, rpoller.sock)) {
> > +        rpoller.is_running = false;
> > +        error_report("%s: failed to create socketpair %s", __func__,
> > +                     strerror(errno));
> > +        return;
> > +    }
> > +
> > +    rpoller.count = 0;
> > +    rpoller.size = 4;
> > +    rpoller.fds = g_malloc0_n(rpoller.size, sizeof(struct pollfd));
> > +    rpoller.riocs = g_malloc0_n(rpoller.size, sizeof(QIOChannelRDMA *));
> > +    rpoller.fds[0].fd = rpoller.sock[1];
> > +    rpoller.fds[0].events = POLLIN;
> > +
> > +    qemu_thread_create(&rpoller.thread, "qio-channel-rdma-rpoller",
> > +                       qio_channel_rdma_rpoller_thread, NULL,
> > +                       QEMU_THREAD_JOINABLE); }
> > +
> > +static void qio_channel_rdma_add_rioc_to_rpoller(QIOChannelRDMA
> > +*rioc) {
> > +    int flags = EFD_CLOEXEC | EFD_NONBLOCK;
> > +
> > +    /*
> > +     * A single eventfd is either readable or writable. A single eventfd
> cannot
> > +     * represent a state where it is neither readable nor writable. so use
> two
> > +     * eventfds here.
> > +     */
> > +    rioc->pollin_eventfd = eventfd(0, flags);
> > +    rioc->pollout_eventfd = eventfd(0, flags);
> > +    /* pollout_eventfd with the value 0, means writable, make it
> unwritable */
> > +    qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, false);
> > +
> > +    /* tell the rpoller to rpoll() events on rioc->socketfd */
> > +    rioc->rpoll_events = POLLIN | POLLOUT;
> > +    qio_channel_rdma_notify_rpoller(rioc, RP_CMD_ADD_IOC); }
> > +
> >  QIOChannelRDMA *qio_channel_rdma_new(void)  {
> >      QIOChannelRDMA *rioc;
> >      QIOChannel *ioc;
> >
> > +    qio_channel_rdma_rpoller_start();
> > +    if (!rpoller.is_running) {
> > +        return NULL;
> > +    }
> > +
> >      rioc =
> QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
> >      ioc = QIO_CHANNEL(rioc);
> >      qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
> @@
> > -125,6 +393,8 @@ retry:
> >          goto out;
> >      }
> >
> > +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> > +
> >  out:
> >      if (ret) {
> >          trace_qio_channel_rdma_connect_fail(rioc);
> > @@ -211,6 +481,8 @@ int
> qio_channel_rdma_listen_sync(QIOChannelRDMA *rioc, InetSocketAddress
> *addr,
> >      qio_channel_set_feature(QIO_CHANNEL(rioc),
> QIO_CHANNEL_FEATURE_LISTEN);
> >      trace_qio_channel_rdma_listen_complete(rioc, fd);
> >
> > +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> > +
> >  out:
> >      if (ret) {
> >          trace_qio_channel_rdma_listen_fail(rioc);
> > @@ -267,8 +539,10 @@ void
> qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress
> *addr,
> >                             qio_channel_listen_worker_free,
> context);
> > }
> >
> > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *rioc,
> Error
> > **errp)
> > +QIOChannelRDMA *coroutine_mixed_fn
> qio_channel_rdma_accept(QIOChannelRDMA *rioc,
> > +
> Error
> > +**errp)
> >  {
> > +    QIOChannel *ioc = QIO_CHANNEL(rioc);
> >      QIOChannelRDMA *cioc;
> >
> >      cioc = qio_channel_rdma_new();
> > @@ -283,6 +557,17 @@ retry:
> >          if (errno == EINTR) {
> >              goto retry;
> >          }
> > +        if (errno == EAGAIN) {
> > +            if (!(rioc->rpoll_events & POLLIN)) {
> > +                qio_channel_rdma_update_poll_event(rioc,
> CLEAR_POLLIN, true);
> > +            }
> > +            if (qemu_in_coroutine()) {
> > +                qio_channel_yield(ioc, G_IO_IN);
> > +            } else {
> > +                qio_channel_wait(ioc, G_IO_IN);
> > +            }
> > +            goto retry;
> > +        }
> >          error_setg_errno(errp, errno, "Unable to accept connection");
> >          goto error;
> >      }
> > @@ -294,6 +579,8 @@ retry:
> >          goto error;
> >      }
> >
> > +    qio_channel_rdma_add_rioc_to_rpoller(cioc);
> > +
> >      trace_qio_channel_rdma_accept_complete(rioc, cioc, cioc->fd);
> >      return cioc;
> >
> > @@ -307,6 +594,10 @@ static void qio_channel_rdma_init(Object *obj)  {
> >      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
> >      ioc->fd = -1;
> > +    ioc->pollin_eventfd = -1;
> > +    ioc->pollout_eventfd = -1;
> > +    ioc->index = -1;
> > +    ioc->rpoll_events = 0;
> >  }
> >
> >  static void qio_channel_rdma_finalize(Object *obj) @@ -314,6 +605,7
> > @@ static void qio_channel_rdma_finalize(Object *obj)
> >      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
> >
> >      if (ioc->fd != -1) {
> > +        qio_channel_rdma_notify_rpoller(ioc, RP_CMD_DEL_IOC);
> >          rclose(ioc->fd);
> >          ioc->fd = -1;
> >      }
> > @@ -330,6 +622,12 @@ static ssize_t
> qio_channel_rdma_readv(QIOChannel
> > *ioc, const struct iovec *iov,
> >  retry:
> >      ret = rreadv(rioc->fd, iov, niov);
> >      if (ret < 0) {
> > +        if (errno == EAGAIN) {
> > +            if (!(rioc->rpoll_events & POLLIN)) {
> > +                qio_channel_rdma_update_poll_event(rioc,
> CLEAR_POLLIN, true);
> > +            }
> > +            return QIO_CHANNEL_ERR_BLOCK;
> > +        }
> >          if (errno == EINTR) {
> >              goto retry;
> >          }
> > @@ -351,6 +649,12 @@ static ssize_t
> qio_channel_rdma_writev(QIOChannel
> > *ioc, const struct iovec *iov,
> >  retry:
> >      ret = rwritev(rioc->fd, iov, niov);
> >      if (ret <= 0) {
> > +        if (errno == EAGAIN) {
> > +            if (!(rioc->rpoll_events & POLLOUT)) {
> > +                qio_channel_rdma_update_poll_event(rioc,
> CLEAR_POLLOUT, true);
> > +            }
> > +            return QIO_CHANNEL_ERR_BLOCK;
> > +        }
> >          if (errno == EINTR) {
> >              goto retry;
> >          }
> > @@ -361,6 +665,28 @@ retry:
> >      return ret;
> >  }
> >
> > +static int qio_channel_rdma_set_blocking(QIOChannel *ioc, bool enabled,
> > +                                         Error **errp
> G_GNUC_UNUSED)
> > +{
> > +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> > +    int flags, ret;
> > +
> > +    flags = rfcntl(rioc->fd, F_GETFL);
> > +    if (enabled) {
> > +        flags &= ~O_NONBLOCK;
> > +    } else {
> > +        flags |= O_NONBLOCK;
> > +    }
> > +
> > +    ret = rfcntl(rioc->fd, F_SETFL, flags);
> > +    if (ret) {
> > +        error_setg_errno(errp, errno,
> > +                         "Unable to rfcntl rsocket fd with flags %d",
> flags);
> > +    }
> > +
> > +    return ret;
> > +}
> > +
> >  static void qio_channel_rdma_set_delay(QIOChannel *ioc, bool enabled)
> > {
> >      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); @@ -374,6
> +700,7 @@
> > static int qio_channel_rdma_close(QIOChannel *ioc, Error **errp)
> >      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> >
> >      if (rioc->fd != -1) {
> > +        qio_channel_rdma_notify_rpoller(rioc, RP_CMD_DEL_IOC);
> >          rclose(rioc->fd);
> >          rioc->fd = -1;
> >      }
> > @@ -408,6 +735,37 @@ static int qio_channel_rdma_shutdown(QIOChannel
> *ioc, QIOChannelShutdown how,
> >      return 0;
> >  }
> >
> > +static void
> > +qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, AioContext
> *read_ctx,
> > +                                    IOHandler *io_read,
> AioContext *write_ctx,
> > +                                    IOHandler *io_write, void
> > +*opaque) {
> > +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> > +
> > +    qio_channel_util_set_aio_fd_handler(rioc->pollin_eventfd, read_ctx,
> io_read,
> > +                                        rioc->pollout_eventfd,
> write_ctx,
> > +                                        io_write, opaque); }
> > +
> > +static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
> > +                                              GIOCondition
> condition)
> > +{
> > +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> > +
> > +    switch (condition) {
> > +    case G_IO_IN:
> > +        return qio_channel_create_fd_watch(ioc, rioc->pollin_eventfd,
> > +                                           condition);
> > +    case G_IO_OUT:
> > +        return qio_channel_create_fd_watch(ioc, rioc->pollout_eventfd,
> > +                                           condition);
> > +    default:
> > +        error_report("%s: do not support watch 0x%x event", __func__,
> > +                     condition);
> > +        return NULL;
> > +    }
> > +}
> > +
> >  static void qio_channel_rdma_class_init(ObjectClass *klass,
> >                                          void *class_data
> > G_GNUC_UNUSED)  { @@ -415,9 +773,12 @@ static void
> > qio_channel_rdma_class_init(ObjectClass *klass,
> >
> >      ioc_klass->io_writev = qio_channel_rdma_writev;
> >      ioc_klass->io_readv = qio_channel_rdma_readv;
> > +    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
> >      ioc_klass->io_close = qio_channel_rdma_close;
> >      ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
> >      ioc_klass->io_set_delay = qio_channel_rdma_set_delay;
> > +    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
> > +    ioc_klass->io_set_aio_fd_handler =
> > + qio_channel_rdma_set_aio_fd_handler;
> >  }
> >
> >  static const TypeInfo qio_channel_rdma_info = {
> > --
> > 2.43.0
> >
> >
Daniel P. Berrangé June 7, 2024, 9:04 a.m. UTC | #3
On Tue, Jun 04, 2024 at 08:14:09PM +0800, Gonglei wrote:
> From: Jialin Wang <wangjialin23@huawei.com>
> 
> It is not feasible to obtain RDMA completion queue notifications
> through poll/ppoll on the rsocket fd. Therefore, we create a thread
> named rpoller for each rsocket fd and two eventfds: pollin_eventfd
> and pollout_eventfd.
> 
> When using io_create_watch or io_set_aio_fd_handler waits for POLLIN
> or POLLOUT events, it will actually poll/ppoll on the pollin_eventfd
> and pollout_eventfd instead of the rsocket fd.
> 
> The rpoller rpoll() on the rsocket fd to receive POLLIN and POLLOUT
> events.
> When a POLLIN event occurs, the rpoller write the pollin_eventfd,
> and then poll/ppoll will return the POLLIN event.
> When a POLLOUT event occurs, the rpoller read the pollout_eventfd,
> and then poll/ppoll will return the POLLOUT event.
> 
> For a non-blocking rsocket fd, if rread/rwrite returns EAGAIN, it will
> read/write the pollin/pollout_eventfd, preventing poll/ppoll from
> returning POLLIN/POLLOUT events.
> 
> Known limitations:
> 
>   For a blocking rsocket fd, if we use io_create_watch to wait for
>   POLLIN or POLLOUT events, since the rsocket fd is blocking, we
>   cannot determine when it is not ready to read/write as we can with
>   non-blocking fds. Therefore, when an event occurs, it will occurs
>   always, potentially leave the qemu hanging. So we need be cautious
>   to avoid hanging when using io_create_watch .
> 
> Luckily, channel-rdma works well in coroutines :)
> 
> Signed-off-by: Jialin Wang <wangjialin23@huawei.com>
> Signed-off-by: Gonglei <arei.gonglei@huawei.com>
> ---
>  include/io/channel-rdma.h |  15 +-
>  io/channel-rdma.c         | 363 +++++++++++++++++++++++++++++++++++++-
>  2 files changed, 376 insertions(+), 2 deletions(-)
> 
> diff --git a/include/io/channel-rdma.h b/include/io/channel-rdma.h
> index 8cab2459e5..cb56127d76 100644
> --- a/include/io/channel-rdma.h
> +++ b/include/io/channel-rdma.h
> @@ -47,6 +47,18 @@ struct QIOChannelRDMA {
>      socklen_t localAddrLen;
>      struct sockaddr_storage remoteAddr;
>      socklen_t remoteAddrLen;
> +
> +    /* private */
> +
> +    /* qemu g_poll/ppoll() POLLIN event on it */
> +    int pollin_eventfd;
> +    /* qemu g_poll/ppoll() POLLOUT event on it */
> +    int pollout_eventfd;
> +
> +    /* the index in the rpoller's fds array */
> +    int index;
> +    /* rpoller will rpoll() rpoll_events on the rsocket fd */
> +    short int rpoll_events;
>  };
>  
>  /**
> @@ -147,6 +159,7 @@ void qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress *addr,
>   *
>   * Returns: the new client channel, or NULL on error
>   */
> -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *ioc, Error **errp);
> +QIOChannelRDMA *coroutine_mixed_fn qio_channel_rdma_accept(QIOChannelRDMA *ioc,
> +                                                           Error **errp);
>  
>  #endif /* QIO_CHANNEL_RDMA_H */
> diff --git a/io/channel-rdma.c b/io/channel-rdma.c
> index 92c362df52..9792add5cf 100644
> --- a/io/channel-rdma.c
> +++ b/io/channel-rdma.c
> @@ -23,10 +23,15 @@
>  
>  #include "qemu/osdep.h"
>  #include "io/channel-rdma.h"
> +#include "io/channel-util.h"
> +#include "io/channel-watch.h"
>  #include "io/channel.h"
>  #include "qapi/clone-visitor.h"
>  #include "qapi/error.h"
>  #include "qapi/qapi-visit-sockets.h"
> +#include "qemu/atomic.h"
> +#include "qemu/error-report.h"
> +#include "qemu/thread.h"
>  #include "trace.h"
>  #include <errno.h>
>  #include <netdb.h>
> @@ -39,11 +44,274 @@
>  #include <sys/poll.h>
>  #include <unistd.h>
>  
> +typedef enum {
> +    CLEAR_POLLIN,
> +    CLEAR_POLLOUT,
> +    SET_POLLIN,
> +    SET_POLLOUT,
> +} UpdateEvent;
> +
> +typedef enum {
> +    RP_CMD_ADD_IOC,
> +    RP_CMD_DEL_IOC,
> +    RP_CMD_UPDATE,
> +} RpollerCMD;
> +
> +typedef struct {
> +    RpollerCMD cmd;
> +    QIOChannelRDMA *rioc;
> +} RpollerMsg;
> +
> +/*
> + * rpoll() on the rsocket fd with rpoll_events, when POLLIN/POLLOUT event
> + * occurs, it will write/read the pollin_eventfd/pollout_eventfd to allow
> + * qemu g_poll/ppoll() get the POLLIN/POLLOUT event
> + */
> +static struct Rpoller {
> +    QemuThread thread;
> +    bool is_running;
> +    int sock[2];
> +    int count; /* the number of rsocket fds being rpoll() */
> +    int size; /* the size of fds/riocs */
> +    struct pollfd *fds;
> +    QIOChannelRDMA **riocs;
> +} rpoller;
> +
> +static void qio_channel_rdma_notify_rpoller(QIOChannelRDMA *rioc,
> +                                            RpollerCMD cmd)
> +{
> +    RpollerMsg msg;
> +    int ret;
> +
> +    msg.cmd = cmd;
> +    msg.rioc = rioc;
> +
> +    ret = RETRY_ON_EINTR(write(rpoller.sock[0], &msg, sizeof msg));

So this message is handled asynchronously by the poll thread, but
you're not acquiring any reference on teh 'rioc' object. So there's
the possibility that the owner of the rioc calls 'unref' free'ing
the last reference, before the poll thread has finished processing
the message.  IMHO the poll thread must hold a reference on the
rioc for as long as it needs the object.

> +    if (ret != sizeof msg) {
> +        error_report("%s: failed to send msg, errno: %d", __func__, errno);
> +    }

I feel like this should be propagated to the caller via an Error **errp
parameter.

> +}
> +
> +static void qio_channel_rdma_update_poll_event(QIOChannelRDMA *rioc,
> +                                               UpdateEvent action,
> +                                               bool notify_rpoller)
> +{
> +    /* An eventfd with the value of ULLONG_MAX - 1 is readable but unwritable */
> +    unsigned long long buf = ULLONG_MAX - 1;
> +
> +    switch (action) {
> +    /* only rpoller do SET_* action, to allow qemu ppoll() get the event */
> +    case SET_POLLIN:
> +        RETRY_ON_EINTR(write(rioc->pollin_eventfd, &buf, sizeof buf));
> +        rioc->rpoll_events &= ~POLLIN;
> +        break;
> +    case SET_POLLOUT:
> +        RETRY_ON_EINTR(read(rioc->pollout_eventfd, &buf, sizeof buf));
> +        rioc->rpoll_events &= ~POLLOUT;
> +        break;
> +
> +    /* the rsocket fd is not ready to rread/rwrite */
> +    case CLEAR_POLLIN:
> +        RETRY_ON_EINTR(read(rioc->pollin_eventfd, &buf, sizeof buf));
> +        rioc->rpoll_events |= POLLIN;
> +        break;
> +    case CLEAR_POLLOUT:
> +        RETRY_ON_EINTR(write(rioc->pollout_eventfd, &buf, sizeof buf));
> +        rioc->rpoll_events |= POLLOUT;
> +        break;
> +    default:
> +        break;
> +    }
> +
> +    /* notify rpoller to rpoll() POLLIN/POLLOUT events */
> +    if (notify_rpoller) {
> +        qio_channel_rdma_notify_rpoller(rioc, RP_CMD_UPDATE);
> +    }
> +}
> +
> +static void qio_channel_rdma_rpoller_add_rioc(QIOChannelRDMA *rioc)
> +{
> +    if (rioc->index != -1) {
> +        error_report("%s: rioc already exsits", __func__);
> +        return;
> +    }
> +
> +    rioc->index = ++rpoller.count;
> +
> +    if (rpoller.count + 1 > rpoller.size) {
> +        rpoller.size *= 2;
> +        rpoller.fds = g_renew(struct pollfd, rpoller.fds, rpoller.size);
> +        rpoller.riocs = g_renew(QIOChannelRDMA *, rpoller.riocs, rpoller.size);
> +    }
> +
> +    rpoller.fds[rioc->index].fd = rioc->fd;
> +    rpoller.fds[rioc->index].events = rioc->rpoll_events;
> +    rpoller.riocs[rioc->index] = rioc;
> +}
> +
> +static void qio_channel_rdma_rpoller_del_rioc(QIOChannelRDMA *rioc)
> +{
> +    if (rioc->index == -1) {
> +        error_report("%s: rioc not exsits", __func__);
> +        return;
> +    }
> +
> +    rpoller.fds[rioc->index] = rpoller.fds[rpoller.count];
> +    rpoller.riocs[rioc->index] = rpoller.riocs[rpoller.count];
> +    rpoller.riocs[rioc->index]->index = rioc->index;
> +    rpoller.count--;
> +
> +    close(rioc->pollin_eventfd);
> +    close(rioc->pollout_eventfd);
> +    rioc->index = -1;
> +    rioc->rpoll_events = 0;
> +}
> +
> +static void qio_channel_rdma_rpoller_update_ioc(QIOChannelRDMA *rioc)
> +{
> +    if (rioc->index == -1) {
> +        error_report("%s: rioc not exsits", __func__);
> +        return;
> +    }
> +
> +    rpoller.fds[rioc->index].fd = rioc->fd;
> +    rpoller.fds[rioc->index].events = rioc->rpoll_events;
> +}
> +
> +static void qio_channel_rdma_rpoller_process_msg(void)
> +{
> +    RpollerMsg msg;
> +    int ret;
> +
> +    ret = RETRY_ON_EINTR(read(rpoller.sock[1], &msg, sizeof msg));
> +    if (ret != sizeof msg) {
> +        error_report("%s: rpoller failed to recv msg: %s", __func__,
> +                     strerror(errno));
> +        return;
> +    }
> +
> +    switch (msg.cmd) {
> +    case RP_CMD_ADD_IOC:
> +        qio_channel_rdma_rpoller_add_rioc(msg.rioc);
> +        break;
> +    case RP_CMD_DEL_IOC:
> +        qio_channel_rdma_rpoller_del_rioc(msg.rioc);
> +        break;
> +    case RP_CMD_UPDATE:
> +        qio_channel_rdma_rpoller_update_ioc(msg.rioc);
> +        break;
> +    default:
> +        break;
> +    }
> +}
> +
> +static void qio_channel_rdma_rpoller_cleanup(void)
> +{
> +    close(rpoller.sock[0]);
> +    close(rpoller.sock[1]);
> +    rpoller.sock[0] = -1;
> +    rpoller.sock[1] = -1;
> +    g_free(rpoller.fds);
> +    g_free(rpoller.riocs);
> +    rpoller.fds = NULL;
> +    rpoller.riocs = NULL;
> +    rpoller.count = 0;
> +    rpoller.size = 0;
> +    rpoller.is_running = false;
> +}
> +
> +static void *qio_channel_rdma_rpoller_thread(void *opaque)
> +{
> +    int i, ret, error_events = POLLERR | POLLHUP | POLLNVAL;
> +
> +    do {
> +        ret = rpoll(rpoller.fds, rpoller.count + 1, -1);
> +        if (ret < 0 && errno != -EINTR) {
> +            error_report("%s: rpoll() error: %s", __func__, strerror(errno));
> +            break;
> +        }
> +
> +        for (i = 1; i <= rpoller.count; i++) {
> +            if (rpoller.fds[i].revents & (POLLIN | error_events)) {
> +                qio_channel_rdma_update_poll_event(rpoller.riocs[i], SET_POLLIN,
> +                                                   false);
> +                rpoller.fds[i].events &= ~POLLIN;
> +            }
> +            if (rpoller.fds[i].revents & (POLLOUT | error_events)) {
> +                qio_channel_rdma_update_poll_event(rpoller.riocs[i],
> +                                                   SET_POLLOUT, false);
> +                rpoller.fds[i].events &= ~POLLOUT;
> +            }
> +            /* ignore this fd */
> +            if (rpoller.fds[i].revents & (error_events)) {
> +                rpoller.fds[i].fd = -1;
> +            }
> +        }
> +
> +        if (rpoller.fds[0].revents) {
> +            qio_channel_rdma_rpoller_process_msg();
> +        }
> +    } while (rpoller.count >= 1);
> +
> +    qio_channel_rdma_rpoller_cleanup();
> +
> +    return NULL;
> +}
> +
> +static void qio_channel_rdma_rpoller_start(void)
> +{
> +    if (qatomic_xchg(&rpoller.is_running, true)) {
> +        return;
> +    }
> +
> +    if (qemu_socketpair(AF_UNIX, SOCK_STREAM, 0, rpoller.sock)) {
> +        rpoller.is_running = false;
> +        error_report("%s: failed to create socketpair %s", __func__,
> +                     strerror(errno));
> +        return;
> +    }
> +
> +    rpoller.count = 0;
> +    rpoller.size = 4;
> +    rpoller.fds = g_malloc0_n(rpoller.size, sizeof(struct pollfd));
> +    rpoller.riocs = g_malloc0_n(rpoller.size, sizeof(QIOChannelRDMA *));
> +    rpoller.fds[0].fd = rpoller.sock[1];
> +    rpoller.fds[0].events = POLLIN;
> +
> +    qemu_thread_create(&rpoller.thread, "qio-channel-rdma-rpoller",
> +                       qio_channel_rdma_rpoller_thread, NULL,
> +                       QEMU_THREAD_JOINABLE);
> +}
> +
> +static void qio_channel_rdma_add_rioc_to_rpoller(QIOChannelRDMA *rioc)
> +{
> +    int flags = EFD_CLOEXEC | EFD_NONBLOCK;
> +
> +    /*
> +     * A single eventfd is either readable or writable. A single eventfd cannot
> +     * represent a state where it is neither readable nor writable. so use two
> +     * eventfds here.
> +     */
> +    rioc->pollin_eventfd = eventfd(0, flags);
> +    rioc->pollout_eventfd = eventfd(0, flags);
> +    /* pollout_eventfd with the value 0, means writable, make it unwritable */
> +    qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, false);
> +
> +    /* tell the rpoller to rpoll() events on rioc->socketfd */
> +    rioc->rpoll_events = POLLIN | POLLOUT;
> +    qio_channel_rdma_notify_rpoller(rioc, RP_CMD_ADD_IOC);
> +}
> +
>  QIOChannelRDMA *qio_channel_rdma_new(void)
>  {
>      QIOChannelRDMA *rioc;
>      QIOChannel *ioc;
>  
> +    qio_channel_rdma_rpoller_start();
> +    if (!rpoller.is_running) {
> +        return NULL;
> +    }
> +
>      rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
>      ioc = QIO_CHANNEL(rioc);
>      qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
> @@ -125,6 +393,8 @@ retry:
>          goto out;
>      }
>  
> +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> +
>  out:
>      if (ret) {
>          trace_qio_channel_rdma_connect_fail(rioc);
> @@ -211,6 +481,8 @@ int qio_channel_rdma_listen_sync(QIOChannelRDMA *rioc, InetSocketAddress *addr,
>      qio_channel_set_feature(QIO_CHANNEL(rioc), QIO_CHANNEL_FEATURE_LISTEN);
>      trace_qio_channel_rdma_listen_complete(rioc, fd);
>  
> +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> +
>  out:
>      if (ret) {
>          trace_qio_channel_rdma_listen_fail(rioc);
> @@ -267,8 +539,10 @@ void qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress *addr,
>                             qio_channel_listen_worker_free, context);
>  }
>  
> -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *rioc, Error **errp)
> +QIOChannelRDMA *coroutine_mixed_fn qio_channel_rdma_accept(QIOChannelRDMA *rioc,
> +                                                           Error **errp)
>  {
> +    QIOChannel *ioc = QIO_CHANNEL(rioc);
>      QIOChannelRDMA *cioc;
>  
>      cioc = qio_channel_rdma_new();
> @@ -283,6 +557,17 @@ retry:
>          if (errno == EINTR) {
>              goto retry;
>          }
> +        if (errno == EAGAIN) {
> +            if (!(rioc->rpoll_events & POLLIN)) {
> +                qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLIN, true);
> +            }
> +            if (qemu_in_coroutine()) {
> +                qio_channel_yield(ioc, G_IO_IN);
> +            } else {
> +                qio_channel_wait(ioc, G_IO_IN);
> +            }
> +            goto retry;
> +        }
>          error_setg_errno(errp, errno, "Unable to accept connection");
>          goto error;
>      }
> @@ -294,6 +579,8 @@ retry:
>          goto error;
>      }
>  
> +    qio_channel_rdma_add_rioc_to_rpoller(cioc);
> +
>      trace_qio_channel_rdma_accept_complete(rioc, cioc, cioc->fd);
>      return cioc;
>  
> @@ -307,6 +594,10 @@ static void qio_channel_rdma_init(Object *obj)
>  {
>      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
>      ioc->fd = -1;
> +    ioc->pollin_eventfd = -1;
> +    ioc->pollout_eventfd = -1;
> +    ioc->index = -1;
> +    ioc->rpoll_events = 0;
>  }
>  
>  static void qio_channel_rdma_finalize(Object *obj)
> @@ -314,6 +605,7 @@ static void qio_channel_rdma_finalize(Object *obj)
>      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
>  
>      if (ioc->fd != -1) {
> +        qio_channel_rdma_notify_rpoller(ioc, RP_CMD_DEL_IOC);

This is unsafe.

When finalize runs, the object has dropped its last reference and
is about to be free()d.  The notify_rpoller() method, however,
sends an async message to the poll thread, which the poll thread
will end up processing after the rioc is free()d. ie a use-after-free.

If you take my earlier suggestion that the poll thread should hold
its own reference on the ioc, then it becomes impossible for the
rioc to be freed while there is still an active I/O watch, and
thus this call can go away, and so will the use after free.

>          rclose(ioc->fd);
>          ioc->fd = -1;
>      }

With regards,
Daniel
Gao,Shiyuan" via June 7, 2024, 9:28 a.m. UTC | #4
Hi Daniel,

> -----Original Message-----
> From: Daniel P. Berrangé [mailto:berrange@redhat.com]
> Sent: Friday, June 7, 2024 5:04 PM
> To: Gonglei (Arei) <arei.gonglei@huawei.com>
> Cc: qemu-devel@nongnu.org; peterx@redhat.com; yu.zhang@ionos.com;
> mgalaxy@akamai.com; elmar.gerdes@ionos.com; zhengchuan
> <zhengchuan@huawei.com>; armbru@redhat.com; lizhijian@fujitsu.com;
> pbonzini@redhat.com; mst@redhat.com; Xiexiangyou
> <xiexiangyou@huawei.com>; linux-rdma@vger.kernel.org; lixiao (H)
> <lixiao91@huawei.com>; jinpu.wang@ionos.com; Wangjialin
> <wangjialin23@huawei.com>
> Subject: Re: [PATCH 3/6] io/channel-rdma: support working in coroutine
> 
> On Tue, Jun 04, 2024 at 08:14:09PM +0800, Gonglei wrote:
> > From: Jialin Wang <wangjialin23@huawei.com>
> >
> > It is not feasible to obtain RDMA completion queue notifications
> > through poll/ppoll on the rsocket fd. Therefore, we create a thread
> > named rpoller for each rsocket fd and two eventfds: pollin_eventfd and
> > pollout_eventfd.
> >
> > When using io_create_watch or io_set_aio_fd_handler waits for POLLIN
> > or POLLOUT events, it will actually poll/ppoll on the pollin_eventfd
> > and pollout_eventfd instead of the rsocket fd.
> >
> > The rpoller rpoll() on the rsocket fd to receive POLLIN and POLLOUT
> > events.
> > When a POLLIN event occurs, the rpoller write the pollin_eventfd, and
> > then poll/ppoll will return the POLLIN event.
> > When a POLLOUT event occurs, the rpoller read the pollout_eventfd, and
> > then poll/ppoll will return the POLLOUT event.
> >
> > For a non-blocking rsocket fd, if rread/rwrite returns EAGAIN, it will
> > read/write the pollin/pollout_eventfd, preventing poll/ppoll from
> > returning POLLIN/POLLOUT events.
> >
> > Known limitations:
> >
> >   For a blocking rsocket fd, if we use io_create_watch to wait for
> >   POLLIN or POLLOUT events, since the rsocket fd is blocking, we
> >   cannot determine when it is not ready to read/write as we can with
> >   non-blocking fds. Therefore, when an event occurs, it will occurs
> >   always, potentially leave the qemu hanging. So we need be cautious
> >   to avoid hanging when using io_create_watch .
> >
> > Luckily, channel-rdma works well in coroutines :)
> >
> > Signed-off-by: Jialin Wang <wangjialin23@huawei.com>
> > Signed-off-by: Gonglei <arei.gonglei@huawei.com>
> > ---
> >  include/io/channel-rdma.h |  15 +-
> >  io/channel-rdma.c         | 363
> +++++++++++++++++++++++++++++++++++++-
> >  2 files changed, 376 insertions(+), 2 deletions(-)
> >
> > diff --git a/include/io/channel-rdma.h b/include/io/channel-rdma.h
> > index 8cab2459e5..cb56127d76 100644
> > --- a/include/io/channel-rdma.h
> > +++ b/include/io/channel-rdma.h
> > @@ -47,6 +47,18 @@ struct QIOChannelRDMA {
> >      socklen_t localAddrLen;
> >      struct sockaddr_storage remoteAddr;
> >      socklen_t remoteAddrLen;
> > +
> > +    /* private */
> > +
> > +    /* qemu g_poll/ppoll() POLLIN event on it */
> > +    int pollin_eventfd;
> > +    /* qemu g_poll/ppoll() POLLOUT event on it */
> > +    int pollout_eventfd;
> > +
> > +    /* the index in the rpoller's fds array */
> > +    int index;
> > +    /* rpoller will rpoll() rpoll_events on the rsocket fd */
> > +    short int rpoll_events;
> >  };
> >
> >  /**
> > @@ -147,6 +159,7 @@ void
> qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress
> *addr,
> >   *
> >   * Returns: the new client channel, or NULL on error
> >   */
> > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *ioc,
> Error
> > **errp);
> > +QIOChannelRDMA *coroutine_mixed_fn
> qio_channel_rdma_accept(QIOChannelRDMA *ioc,
> > +
> Error
> > +**errp);
> >
> >  #endif /* QIO_CHANNEL_RDMA_H */
> > diff --git a/io/channel-rdma.c b/io/channel-rdma.c index
> > 92c362df52..9792add5cf 100644
> > --- a/io/channel-rdma.c
> > +++ b/io/channel-rdma.c
> > @@ -23,10 +23,15 @@
> >
> >  #include "qemu/osdep.h"
> >  #include "io/channel-rdma.h"
> > +#include "io/channel-util.h"
> > +#include "io/channel-watch.h"
> >  #include "io/channel.h"
> >  #include "qapi/clone-visitor.h"
> >  #include "qapi/error.h"
> >  #include "qapi/qapi-visit-sockets.h"
> > +#include "qemu/atomic.h"
> > +#include "qemu/error-report.h"
> > +#include "qemu/thread.h"
> >  #include "trace.h"
> >  #include <errno.h>
> >  #include <netdb.h>
> > @@ -39,11 +44,274 @@
> >  #include <sys/poll.h>
> >  #include <unistd.h>
> >
> > +typedef enum {
> > +    CLEAR_POLLIN,
> > +    CLEAR_POLLOUT,
> > +    SET_POLLIN,
> > +    SET_POLLOUT,
> > +} UpdateEvent;
> > +
> > +typedef enum {
> > +    RP_CMD_ADD_IOC,
> > +    RP_CMD_DEL_IOC,
> > +    RP_CMD_UPDATE,
> > +} RpollerCMD;
> > +
> > +typedef struct {
> > +    RpollerCMD cmd;
> > +    QIOChannelRDMA *rioc;
> > +} RpollerMsg;
> > +
> > +/*
> > + * rpoll() on the rsocket fd with rpoll_events, when POLLIN/POLLOUT
> > +event
> > + * occurs, it will write/read the pollin_eventfd/pollout_eventfd to
> > +allow
> > + * qemu g_poll/ppoll() get the POLLIN/POLLOUT event  */ static struct
> > +Rpoller {
> > +    QemuThread thread;
> > +    bool is_running;
> > +    int sock[2];
> > +    int count; /* the number of rsocket fds being rpoll() */
> > +    int size; /* the size of fds/riocs */
> > +    struct pollfd *fds;
> > +    QIOChannelRDMA **riocs;
> > +} rpoller;
> > +
> > +static void qio_channel_rdma_notify_rpoller(QIOChannelRDMA *rioc,
> > +                                            RpollerCMD cmd) {
> > +    RpollerMsg msg;
> > +    int ret;
> > +
> > +    msg.cmd = cmd;
> > +    msg.rioc = rioc;
> > +
> > +    ret = RETRY_ON_EINTR(write(rpoller.sock[0], &msg, sizeof msg));
> 
> So this message is handled asynchronously by the poll thread, but you're not
> acquiring any reference on teh 'rioc' object. So there's the possibility that the
> owner of the rioc calls 'unref' free'ing the last reference, before the poll thread
> has finished processing the message.  IMHO the poll thread must hold a
> reference on the rioc for as long as it needs the object.
> 
Yes. You're right.


> > +    if (ret != sizeof msg) {
> > +        error_report("%s: failed to send msg, errno: %d", __func__,
> errno);
> > +    }
> 
> I feel like this should be propagated to the caller via an Error **errp parameter.
> 

OK. 


> > +}
> > +
> > +static void qio_channel_rdma_update_poll_event(QIOChannelRDMA *rioc,
> > +                                               UpdateEvent
> action,
> > +                                               bool notify_rpoller)
> {
> > +    /* An eventfd with the value of ULLONG_MAX - 1 is readable but
> unwritable */
> > +    unsigned long long buf = ULLONG_MAX - 1;
> > +
> > +    switch (action) {
> > +    /* only rpoller do SET_* action, to allow qemu ppoll() get the event */
> > +    case SET_POLLIN:
> > +        RETRY_ON_EINTR(write(rioc->pollin_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events &= ~POLLIN;
> > +        break;
> > +    case SET_POLLOUT:
> > +        RETRY_ON_EINTR(read(rioc->pollout_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events &= ~POLLOUT;
> > +        break;
> > +
> > +    /* the rsocket fd is not ready to rread/rwrite */
> > +    case CLEAR_POLLIN:
> > +        RETRY_ON_EINTR(read(rioc->pollin_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events |= POLLIN;
> > +        break;
> > +    case CLEAR_POLLOUT:
> > +        RETRY_ON_EINTR(write(rioc->pollout_eventfd, &buf, sizeof buf));
> > +        rioc->rpoll_events |= POLLOUT;
> > +        break;
> > +    default:
> > +        break;
> > +    }
> > +
> > +    /* notify rpoller to rpoll() POLLIN/POLLOUT events */
> > +    if (notify_rpoller) {
> > +        qio_channel_rdma_notify_rpoller(rioc, RP_CMD_UPDATE);
> > +    }
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_add_rioc(QIOChannelRDMA *rioc) {
> > +    if (rioc->index != -1) {
> > +        error_report("%s: rioc already exsits", __func__);
> > +        return;
> > +    }
> > +
> > +    rioc->index = ++rpoller.count;
> > +
> > +    if (rpoller.count + 1 > rpoller.size) {
> > +        rpoller.size *= 2;
> > +        rpoller.fds = g_renew(struct pollfd, rpoller.fds, rpoller.size);
> > +        rpoller.riocs = g_renew(QIOChannelRDMA *, rpoller.riocs,
> rpoller.size);
> > +    }
> > +
> > +    rpoller.fds[rioc->index].fd = rioc->fd;
> > +    rpoller.fds[rioc->index].events = rioc->rpoll_events;
> > +    rpoller.riocs[rioc->index] = rioc; }
> > +
> > +static void qio_channel_rdma_rpoller_del_rioc(QIOChannelRDMA *rioc) {
> > +    if (rioc->index == -1) {
> > +        error_report("%s: rioc not exsits", __func__);
> > +        return;
> > +    }
> > +
> > +    rpoller.fds[rioc->index] = rpoller.fds[rpoller.count];
> > +    rpoller.riocs[rioc->index] = rpoller.riocs[rpoller.count];
> > +    rpoller.riocs[rioc->index]->index = rioc->index;
> > +    rpoller.count--;
> > +
> > +    close(rioc->pollin_eventfd);
> > +    close(rioc->pollout_eventfd);
> > +    rioc->index = -1;
> > +    rioc->rpoll_events = 0;
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_update_ioc(QIOChannelRDMA *rioc)
> > +{
> > +    if (rioc->index == -1) {
> > +        error_report("%s: rioc not exsits", __func__);
> > +        return;
> > +    }
> > +
> > +    rpoller.fds[rioc->index].fd = rioc->fd;
> > +    rpoller.fds[rioc->index].events = rioc->rpoll_events; }
> > +
> > +static void qio_channel_rdma_rpoller_process_msg(void)
> > +{
> > +    RpollerMsg msg;
> > +    int ret;
> > +
> > +    ret = RETRY_ON_EINTR(read(rpoller.sock[1], &msg, sizeof msg));
> > +    if (ret != sizeof msg) {
> > +        error_report("%s: rpoller failed to recv msg: %s", __func__,
> > +                     strerror(errno));
> > +        return;
> > +    }
> > +
> > +    switch (msg.cmd) {
> > +    case RP_CMD_ADD_IOC:
> > +        qio_channel_rdma_rpoller_add_rioc(msg.rioc);
> > +        break;
> > +    case RP_CMD_DEL_IOC:
> > +        qio_channel_rdma_rpoller_del_rioc(msg.rioc);
> > +        break;
> > +    case RP_CMD_UPDATE:
> > +        qio_channel_rdma_rpoller_update_ioc(msg.rioc);
> > +        break;
> > +    default:
> > +        break;
> > +    }
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_cleanup(void)
> > +{
> > +    close(rpoller.sock[0]);
> > +    close(rpoller.sock[1]);
> > +    rpoller.sock[0] = -1;
> > +    rpoller.sock[1] = -1;
> > +    g_free(rpoller.fds);
> > +    g_free(rpoller.riocs);
> > +    rpoller.fds = NULL;
> > +    rpoller.riocs = NULL;
> > +    rpoller.count = 0;
> > +    rpoller.size = 0;
> > +    rpoller.is_running = false;
> > +}
> > +
> > +static void *qio_channel_rdma_rpoller_thread(void *opaque) {
> > +    int i, ret, error_events = POLLERR | POLLHUP | POLLNVAL;
> > +
> > +    do {
> > +        ret = rpoll(rpoller.fds, rpoller.count + 1, -1);
> > +        if (ret < 0 && errno != -EINTR) {
> > +            error_report("%s: rpoll() error: %s", __func__,
> strerror(errno));
> > +            break;
> > +        }
> > +
> > +        for (i = 1; i <= rpoller.count; i++) {
> > +            if (rpoller.fds[i].revents & (POLLIN | error_events)) {
> > +                qio_channel_rdma_update_poll_event(rpoller.riocs[i],
> SET_POLLIN,
> > +                                                   false);
> > +                rpoller.fds[i].events &= ~POLLIN;
> > +            }
> > +            if (rpoller.fds[i].revents & (POLLOUT | error_events)) {
> > +                qio_channel_rdma_update_poll_event(rpoller.riocs[i],
> > +
> SET_POLLOUT, false);
> > +                rpoller.fds[i].events &= ~POLLOUT;
> > +            }
> > +            /* ignore this fd */
> > +            if (rpoller.fds[i].revents & (error_events)) {
> > +                rpoller.fds[i].fd = -1;
> > +            }
> > +        }
> > +
> > +        if (rpoller.fds[0].revents) {
> > +            qio_channel_rdma_rpoller_process_msg();
> > +        }
> > +    } while (rpoller.count >= 1);
> > +
> > +    qio_channel_rdma_rpoller_cleanup();
> > +
> > +    return NULL;
> > +}
> > +
> > +static void qio_channel_rdma_rpoller_start(void)
> > +{
> > +    if (qatomic_xchg(&rpoller.is_running, true)) {
> > +        return;
> > +    }
> > +
> > +    if (qemu_socketpair(AF_UNIX, SOCK_STREAM, 0, rpoller.sock)) {
> > +        rpoller.is_running = false;
> > +        error_report("%s: failed to create socketpair %s", __func__,
> > +                     strerror(errno));
> > +        return;
> > +    }
> > +
> > +    rpoller.count = 0;
> > +    rpoller.size = 4;
> > +    rpoller.fds = g_malloc0_n(rpoller.size, sizeof(struct pollfd));
> > +    rpoller.riocs = g_malloc0_n(rpoller.size, sizeof(QIOChannelRDMA *));
> > +    rpoller.fds[0].fd = rpoller.sock[1];
> > +    rpoller.fds[0].events = POLLIN;
> > +
> > +    qemu_thread_create(&rpoller.thread, "qio-channel-rdma-rpoller",
> > +                       qio_channel_rdma_rpoller_thread, NULL,
> > +                       QEMU_THREAD_JOINABLE); }
> > +
> > +static void qio_channel_rdma_add_rioc_to_rpoller(QIOChannelRDMA
> > +*rioc) {
> > +    int flags = EFD_CLOEXEC | EFD_NONBLOCK;
> > +
> > +    /*
> > +     * A single eventfd is either readable or writable. A single eventfd
> cannot
> > +     * represent a state where it is neither readable nor writable. so use
> two
> > +     * eventfds here.
> > +     */
> > +    rioc->pollin_eventfd = eventfd(0, flags);
> > +    rioc->pollout_eventfd = eventfd(0, flags);
> > +    /* pollout_eventfd with the value 0, means writable, make it
> unwritable */
> > +    qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, false);
> > +
> > +    /* tell the rpoller to rpoll() events on rioc->socketfd */
> > +    rioc->rpoll_events = POLLIN | POLLOUT;
> > +    qio_channel_rdma_notify_rpoller(rioc, RP_CMD_ADD_IOC); }
> > +
> >  QIOChannelRDMA *qio_channel_rdma_new(void)  {
> >      QIOChannelRDMA *rioc;
> >      QIOChannel *ioc;
> >
> > +    qio_channel_rdma_rpoller_start();
> > +    if (!rpoller.is_running) {
> > +        return NULL;
> > +    }
> > +
> >      rioc =
> QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
> >      ioc = QIO_CHANNEL(rioc);
> >      qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
> @@
> > -125,6 +393,8 @@ retry:
> >          goto out;
> >      }
> >
> > +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> > +
> >  out:
> >      if (ret) {
> >          trace_qio_channel_rdma_connect_fail(rioc);
> > @@ -211,6 +481,8 @@ int
> qio_channel_rdma_listen_sync(QIOChannelRDMA *rioc, InetSocketAddress
> *addr,
> >      qio_channel_set_feature(QIO_CHANNEL(rioc),
> QIO_CHANNEL_FEATURE_LISTEN);
> >      trace_qio_channel_rdma_listen_complete(rioc, fd);
> >
> > +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> > +
> >  out:
> >      if (ret) {
> >          trace_qio_channel_rdma_listen_fail(rioc);
> > @@ -267,8 +539,10 @@ void
> qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress
> *addr,
> >                             qio_channel_listen_worker_free,
> context);
> > }
> >
> > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *rioc,
> Error
> > **errp)
> > +QIOChannelRDMA *coroutine_mixed_fn
> qio_channel_rdma_accept(QIOChannelRDMA *rioc,
> > +
> Error
> > +**errp)
> >  {
> > +    QIOChannel *ioc = QIO_CHANNEL(rioc);
> >      QIOChannelRDMA *cioc;
> >
> >      cioc = qio_channel_rdma_new();
> > @@ -283,6 +557,17 @@ retry:
> >          if (errno == EINTR) {
> >              goto retry;
> >          }
> > +        if (errno == EAGAIN) {
> > +            if (!(rioc->rpoll_events & POLLIN)) {
> > +                qio_channel_rdma_update_poll_event(rioc,
> CLEAR_POLLIN, true);
> > +            }
> > +            if (qemu_in_coroutine()) {
> > +                qio_channel_yield(ioc, G_IO_IN);
> > +            } else {
> > +                qio_channel_wait(ioc, G_IO_IN);
> > +            }
> > +            goto retry;
> > +        }
> >          error_setg_errno(errp, errno, "Unable to accept connection");
> >          goto error;
> >      }
> > @@ -294,6 +579,8 @@ retry:
> >          goto error;
> >      }
> >
> > +    qio_channel_rdma_add_rioc_to_rpoller(cioc);
> > +
> >      trace_qio_channel_rdma_accept_complete(rioc, cioc, cioc->fd);
> >      return cioc;
> >
> > @@ -307,6 +594,10 @@ static void qio_channel_rdma_init(Object *obj)  {
> >      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
> >      ioc->fd = -1;
> > +    ioc->pollin_eventfd = -1;
> > +    ioc->pollout_eventfd = -1;
> > +    ioc->index = -1;
> > +    ioc->rpoll_events = 0;
> >  }
> >
> >  static void qio_channel_rdma_finalize(Object *obj) @@ -314,6 +605,7
> > @@ static void qio_channel_rdma_finalize(Object *obj)
> >      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
> >
> >      if (ioc->fd != -1) {
> > +        qio_channel_rdma_notify_rpoller(ioc, RP_CMD_DEL_IOC);
> 
> This is unsafe.
> 
> When finalize runs, the object has dropped its last reference and is about to be
> free()d.  The notify_rpoller() method, however, sends an async message to the
> poll thread, which the poll thread will end up processing after the rioc is free()d.
> ie a use-after-free.
> 
> If you take my earlier suggestion that the poll thread should hold its own
> reference on the ioc, then it becomes impossible for the rioc to be freed while
> there is still an active I/O watch, and thus this call can go away, and so will the
> use after free.
> 

Yes, will fixed in the next version.

Regards,
-Gonglei

> >          rclose(ioc->fd);
> >          ioc->fd = -1;
> >      }
> 
> With regards,
> Daniel
> --
> |: https://berrange.com      -o-
> https://www.flickr.com/photos/dberrange :|
> |: https://libvirt.org         -o-
> https://fstop138.berrange.com :|
> |: https://entangle-photo.org    -o-
> https://www.instagram.com/dberrange :|
>
Haris Iqbal June 7, 2024, 10:01 a.m. UTC | #5
On Fri, Jun 7, 2024 at 10:45 AM Gonglei (Arei) <arei.gonglei@huawei.com> wrote:
>
>
>
> > -----Original Message-----
> > From: Haris Iqbal [mailto:haris.iqbal@ionos.com]
> > Sent: Thursday, June 6, 2024 9:35 PM
> > To: Gonglei (Arei) <arei.gonglei@huawei.com>
> > Cc: qemu-devel@nongnu.org; peterx@redhat.com; yu.zhang@ionos.com;
> > mgalaxy@akamai.com; elmar.gerdes@ionos.com; zhengchuan
> > <zhengchuan@huawei.com>; berrange@redhat.com; armbru@redhat.com;
> > lizhijian@fujitsu.com; pbonzini@redhat.com; mst@redhat.com; Xiexiangyou
> > <xiexiangyou@huawei.com>; linux-rdma@vger.kernel.org; lixiao (H)
> > <lixiao91@huawei.com>; jinpu.wang@ionos.com; Wangjialin
> > <wangjialin23@huawei.com>
> > Subject: Re: [PATCH 3/6] io/channel-rdma: support working in coroutine
> >
> > On Tue, Jun 4, 2024 at 2:14 PM Gonglei <arei.gonglei@huawei.com> wrote:
> > >
> > > From: Jialin Wang <wangjialin23@huawei.com>
> > >
> > > It is not feasible to obtain RDMA completion queue notifications
> > > through poll/ppoll on the rsocket fd. Therefore, we create a thread
> > > named rpoller for each rsocket fd and two eventfds: pollin_eventfd and
> > > pollout_eventfd.
> > >
> > > When using io_create_watch or io_set_aio_fd_handler waits for POLLIN
> > > or POLLOUT events, it will actually poll/ppoll on the pollin_eventfd
> > > and pollout_eventfd instead of the rsocket fd.
> > >
> > > The rpoller rpoll() on the rsocket fd to receive POLLIN and POLLOUT
> > > events.
> > > When a POLLIN event occurs, the rpoller write the pollin_eventfd, and
> > > then poll/ppoll will return the POLLIN event.
> > > When a POLLOUT event occurs, the rpoller read the pollout_eventfd, and
> > > then poll/ppoll will return the POLLOUT event.
> > >
> > > For a non-blocking rsocket fd, if rread/rwrite returns EAGAIN, it will
> > > read/write the pollin/pollout_eventfd, preventing poll/ppoll from
> > > returning POLLIN/POLLOUT events.
> > >
> > > Known limitations:
> > >
> > >   For a blocking rsocket fd, if we use io_create_watch to wait for
> > >   POLLIN or POLLOUT events, since the rsocket fd is blocking, we
> > >   cannot determine when it is not ready to read/write as we can with
> > >   non-blocking fds. Therefore, when an event occurs, it will occurs
> > >   always, potentially leave the qemu hanging. So we need be cautious
> > >   to avoid hanging when using io_create_watch .
> > >
> > > Luckily, channel-rdma works well in coroutines :)
> > >
> > > Signed-off-by: Jialin Wang <wangjialin23@huawei.com>
> > > Signed-off-by: Gonglei <arei.gonglei@huawei.com>
> > > ---
> > >  include/io/channel-rdma.h |  15 +-
> > >  io/channel-rdma.c         | 363
> > +++++++++++++++++++++++++++++++++++++-
> > >  2 files changed, 376 insertions(+), 2 deletions(-)
> > >
> > > diff --git a/include/io/channel-rdma.h b/include/io/channel-rdma.h
> > > index 8cab2459e5..cb56127d76 100644
> > > --- a/include/io/channel-rdma.h
> > > +++ b/include/io/channel-rdma.h
> > > @@ -47,6 +47,18 @@ struct QIOChannelRDMA {
> > >      socklen_t localAddrLen;
> > >      struct sockaddr_storage remoteAddr;
> > >      socklen_t remoteAddrLen;
> > > +
> > > +    /* private */
> > > +
> > > +    /* qemu g_poll/ppoll() POLLIN event on it */
> > > +    int pollin_eventfd;
> > > +    /* qemu g_poll/ppoll() POLLOUT event on it */
> > > +    int pollout_eventfd;
> > > +
> > > +    /* the index in the rpoller's fds array */
> > > +    int index;
> > > +    /* rpoller will rpoll() rpoll_events on the rsocket fd */
> > > +    short int rpoll_events;
> > >  };
> > >
> > >  /**
> > > @@ -147,6 +159,7 @@ void
> > qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress
> > *addr,
> > >   *
> > >   * Returns: the new client channel, or NULL on error
> > >   */
> > > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *ioc,
> > Error
> > > **errp);
> > > +QIOChannelRDMA *coroutine_mixed_fn
> > qio_channel_rdma_accept(QIOChannelRDMA *ioc,
> > > +
> > Error
> > > +**errp);
> > >
> > >  #endif /* QIO_CHANNEL_RDMA_H */
> > > diff --git a/io/channel-rdma.c b/io/channel-rdma.c index
> > > 92c362df52..9792add5cf 100644
> > > --- a/io/channel-rdma.c
> > > +++ b/io/channel-rdma.c
> > > @@ -23,10 +23,15 @@
> > >
> > >  #include "qemu/osdep.h"
> > >  #include "io/channel-rdma.h"
> > > +#include "io/channel-util.h"
> > > +#include "io/channel-watch.h"
> > >  #include "io/channel.h"
> > >  #include "qapi/clone-visitor.h"
> > >  #include "qapi/error.h"
> > >  #include "qapi/qapi-visit-sockets.h"
> > > +#include "qemu/atomic.h"
> > > +#include "qemu/error-report.h"
> > > +#include "qemu/thread.h"
> > >  #include "trace.h"
> > >  #include <errno.h>
> > >  #include <netdb.h>
> > > @@ -39,11 +44,274 @@
> > >  #include <sys/poll.h>
> > >  #include <unistd.h>
> > >
> > > +typedef enum {
> > > +    CLEAR_POLLIN,
> > > +    CLEAR_POLLOUT,
> > > +    SET_POLLIN,
> > > +    SET_POLLOUT,
> > > +} UpdateEvent;
> > > +
> > > +typedef enum {
> > > +    RP_CMD_ADD_IOC,
> > > +    RP_CMD_DEL_IOC,
> > > +    RP_CMD_UPDATE,
> > > +} RpollerCMD;
> > > +
> > > +typedef struct {
> > > +    RpollerCMD cmd;
> > > +    QIOChannelRDMA *rioc;
> > > +} RpollerMsg;
> > > +
> > > +/*
> > > + * rpoll() on the rsocket fd with rpoll_events, when POLLIN/POLLOUT
> > > +event
> > > + * occurs, it will write/read the pollin_eventfd/pollout_eventfd to
> > > +allow
> > > + * qemu g_poll/ppoll() get the POLLIN/POLLOUT event  */ static struct
> > > +Rpoller {
> > > +    QemuThread thread;
> > > +    bool is_running;
> > > +    int sock[2];
> > > +    int count; /* the number of rsocket fds being rpoll() */
> > > +    int size; /* the size of fds/riocs */
> > > +    struct pollfd *fds;
> > > +    QIOChannelRDMA **riocs;
> > > +} rpoller;
> > > +
> > > +static void qio_channel_rdma_notify_rpoller(QIOChannelRDMA *rioc,
> > > +                                            RpollerCMD cmd) {
> > > +    RpollerMsg msg;
> > > +    int ret;
> > > +
> > > +    msg.cmd = cmd;
> > > +    msg.rioc = rioc;
> > > +
> > > +    ret = RETRY_ON_EINTR(write(rpoller.sock[0], &msg, sizeof msg));
> > > +    if (ret != sizeof msg) {
> > > +        error_report("%s: failed to send msg, errno: %d", __func__,
> > errno);
> > > +    }
> > > +}
> > > +
> > > +static void qio_channel_rdma_update_poll_event(QIOChannelRDMA *rioc,
> > > +                                               UpdateEvent
> > action,
> > > +                                               bool notify_rpoller)
> > {
> > > +    /* An eventfd with the value of ULLONG_MAX - 1 is readable but
> > unwritable */
> > > +    unsigned long long buf = ULLONG_MAX - 1;
> > > +
> > > +    switch (action) {
> > > +    /* only rpoller do SET_* action, to allow qemu ppoll() get the event */
> > > +    case SET_POLLIN:
> > > +        RETRY_ON_EINTR(write(rioc->pollin_eventfd, &buf, sizeof buf));
> > > +        rioc->rpoll_events &= ~POLLIN;
> > > +        break;
> > > +    case SET_POLLOUT:
> > > +        RETRY_ON_EINTR(read(rioc->pollout_eventfd, &buf, sizeof buf));
> > > +        rioc->rpoll_events &= ~POLLOUT;
> > > +        break;
> > > +
> > > +    /* the rsocket fd is not ready to rread/rwrite */
> > > +    case CLEAR_POLLIN:
> > > +        RETRY_ON_EINTR(read(rioc->pollin_eventfd, &buf, sizeof buf));
> > > +        rioc->rpoll_events |= POLLIN;
> > > +        break;
> > > +    case CLEAR_POLLOUT:
> > > +        RETRY_ON_EINTR(write(rioc->pollout_eventfd, &buf, sizeof buf));
> > > +        rioc->rpoll_events |= POLLOUT;
> > > +        break;
> > > +    default:
> > > +        break;
> > > +    }
> > > +
> > > +    /* notify rpoller to rpoll() POLLIN/POLLOUT events */
> > > +    if (notify_rpoller) {
> > > +        qio_channel_rdma_notify_rpoller(rioc, RP_CMD_UPDATE);
> > > +    }
> > > +}
> > > +
> > > +static void qio_channel_rdma_rpoller_add_rioc(QIOChannelRDMA *rioc) {
> > > +    if (rioc->index != -1) {
> > > +        error_report("%s: rioc already exsits", __func__);
> > > +        return;
> > > +    }
> > > +
> > > +    rioc->index = ++rpoller.count;
> > > +
> > > +    if (rpoller.count + 1 > rpoller.size) {
> > > +        rpoller.size *= 2;
> > > +        rpoller.fds = g_renew(struct pollfd, rpoller.fds, rpoller.size);
> > > +        rpoller.riocs = g_renew(QIOChannelRDMA *, rpoller.riocs,
> > rpoller.size);
> > > +    }
> > > +
> > > +    rpoller.fds[rioc->index].fd = rioc->fd;
> > > +    rpoller.fds[rioc->index].events = rioc->rpoll_events;
> >
> > The allotment of rioc fds and events to rpoller slots are sequential, but making
> > the deletion also sequentials means that the del_rioc needs to be called in the
> > exact opposite sequence as they were added (through add_rioc). Otherwise we
> > leaves holes in between, and readditions might step on an already used slot.
> >
> > Does this setup make sure that the above restriction is satisfied, or am I
> > missing something?
> >
>
> Actually, we use an O (1) algorithm for deletion, that is, each time we replace the array element to be deleted with the last one.
> Pls see qio_channel_rdma_rpoller_del_rioc():

Ah yes. I missed that. Thanks for the response.

>
>    rpoller.fds[rioc->index] = rpoller.fds[rpoller.count];
>
> > > +    rpoller.riocs[rioc->index] = rioc; }
> > > +
> > > +static void qio_channel_rdma_rpoller_del_rioc(QIOChannelRDMA *rioc) {
> > > +    if (rioc->index == -1) {
> > > +        error_report("%s: rioc not exsits", __func__);
> > > +        return;
> > > +    }
> > > +
> > > +    rpoller.fds[rioc->index] = rpoller.fds[rpoller.count];
> >
> > Should this be rpoller.count-1?
> >
> No. the first element is the sockpairs' fd. Pls see qio_channel_rdma_rpoller_start():
>
>    rpoller.fds[0].fd = rpoller.sock[1];
>    rpoller.fds[0].events = POLLIN;
>
>
> Regards,
> -Gonglei
>
> > > +    rpoller.riocs[rioc->index] = rpoller.riocs[rpoller.count];
> > > +    rpoller.riocs[rioc->index]->index = rioc->index;
> > > +    rpoller.count--;
> > > +
> > > +    close(rioc->pollin_eventfd);
> > > +    close(rioc->pollout_eventfd);
> > > +    rioc->index = -1;
> > > +    rioc->rpoll_events = 0;
> > > +}
> > > +
> > > +static void qio_channel_rdma_rpoller_update_ioc(QIOChannelRDMA *rioc)
> > > +{
> > > +    if (rioc->index == -1) {
> > > +        error_report("%s: rioc not exsits", __func__);
> > > +        return;
> > > +    }
> > > +
> > > +    rpoller.fds[rioc->index].fd = rioc->fd;
> > > +    rpoller.fds[rioc->index].events = rioc->rpoll_events; }
> > > +
> > > +static void qio_channel_rdma_rpoller_process_msg(void)
> > > +{
> > > +    RpollerMsg msg;
> > > +    int ret;
> > > +
> > > +    ret = RETRY_ON_EINTR(read(rpoller.sock[1], &msg, sizeof msg));
> > > +    if (ret != sizeof msg) {
> > > +        error_report("%s: rpoller failed to recv msg: %s", __func__,
> > > +                     strerror(errno));
> > > +        return;
> > > +    }
> > > +
> > > +    switch (msg.cmd) {
> > > +    case RP_CMD_ADD_IOC:
> > > +        qio_channel_rdma_rpoller_add_rioc(msg.rioc);
> > > +        break;
> > > +    case RP_CMD_DEL_IOC:
> > > +        qio_channel_rdma_rpoller_del_rioc(msg.rioc);
> > > +        break;
> > > +    case RP_CMD_UPDATE:
> > > +        qio_channel_rdma_rpoller_update_ioc(msg.rioc);
> > > +        break;
> > > +    default:
> > > +        break;
> > > +    }
> > > +}
> > > +
> > > +static void qio_channel_rdma_rpoller_cleanup(void)
> > > +{
> > > +    close(rpoller.sock[0]);
> > > +    close(rpoller.sock[1]);
> > > +    rpoller.sock[0] = -1;
> > > +    rpoller.sock[1] = -1;
> > > +    g_free(rpoller.fds);
> > > +    g_free(rpoller.riocs);
> > > +    rpoller.fds = NULL;
> > > +    rpoller.riocs = NULL;
> > > +    rpoller.count = 0;
> > > +    rpoller.size = 0;
> > > +    rpoller.is_running = false;
> > > +}
> > > +
> > > +static void *qio_channel_rdma_rpoller_thread(void *opaque) {
> > > +    int i, ret, error_events = POLLERR | POLLHUP | POLLNVAL;
> > > +
> > > +    do {
> > > +        ret = rpoll(rpoller.fds, rpoller.count + 1, -1);
> > > +        if (ret < 0 && errno != -EINTR) {
> > > +            error_report("%s: rpoll() error: %s", __func__,
> > strerror(errno));
> > > +            break;
> > > +        }
> > > +
> > > +        for (i = 1; i <= rpoller.count; i++) {
> > > +            if (rpoller.fds[i].revents & (POLLIN | error_events)) {
> > > +                qio_channel_rdma_update_poll_event(rpoller.riocs[i],
> > SET_POLLIN,
> > > +                                                   false);
> > > +                rpoller.fds[i].events &= ~POLLIN;
> > > +            }
> > > +            if (rpoller.fds[i].revents & (POLLOUT | error_events)) {
> > > +                qio_channel_rdma_update_poll_event(rpoller.riocs[i],
> > > +
> > SET_POLLOUT, false);
> > > +                rpoller.fds[i].events &= ~POLLOUT;
> > > +            }
> > > +            /* ignore this fd */
> > > +            if (rpoller.fds[i].revents & (error_events)) {
> > > +                rpoller.fds[i].fd = -1;
> > > +            }
> > > +        }
> > > +
> > > +        if (rpoller.fds[0].revents) {
> > > +            qio_channel_rdma_rpoller_process_msg();
> > > +        }
> > > +    } while (rpoller.count >= 1);
> > > +
> > > +    qio_channel_rdma_rpoller_cleanup();
> > > +
> > > +    return NULL;
> > > +}
> > > +
> > > +static void qio_channel_rdma_rpoller_start(void)
> > > +{
> > > +    if (qatomic_xchg(&rpoller.is_running, true)) {
> > > +        return;
> > > +    }
> > > +
> > > +    if (qemu_socketpair(AF_UNIX, SOCK_STREAM, 0, rpoller.sock)) {
> > > +        rpoller.is_running = false;
> > > +        error_report("%s: failed to create socketpair %s", __func__,
> > > +                     strerror(errno));
> > > +        return;
> > > +    }
> > > +
> > > +    rpoller.count = 0;
> > > +    rpoller.size = 4;
> > > +    rpoller.fds = g_malloc0_n(rpoller.size, sizeof(struct pollfd));
> > > +    rpoller.riocs = g_malloc0_n(rpoller.size, sizeof(QIOChannelRDMA *));
> > > +    rpoller.fds[0].fd = rpoller.sock[1];
> > > +    rpoller.fds[0].events = POLLIN;
> > > +
> > > +    qemu_thread_create(&rpoller.thread, "qio-channel-rdma-rpoller",
> > > +                       qio_channel_rdma_rpoller_thread, NULL,
> > > +                       QEMU_THREAD_JOINABLE); }
> > > +
> > > +static void qio_channel_rdma_add_rioc_to_rpoller(QIOChannelRDMA
> > > +*rioc) {
> > > +    int flags = EFD_CLOEXEC | EFD_NONBLOCK;
> > > +
> > > +    /*
> > > +     * A single eventfd is either readable or writable. A single eventfd
> > cannot
> > > +     * represent a state where it is neither readable nor writable. so use
> > two
> > > +     * eventfds here.
> > > +     */
> > > +    rioc->pollin_eventfd = eventfd(0, flags);
> > > +    rioc->pollout_eventfd = eventfd(0, flags);
> > > +    /* pollout_eventfd with the value 0, means writable, make it
> > unwritable */
> > > +    qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, false);
> > > +
> > > +    /* tell the rpoller to rpoll() events on rioc->socketfd */
> > > +    rioc->rpoll_events = POLLIN | POLLOUT;
> > > +    qio_channel_rdma_notify_rpoller(rioc, RP_CMD_ADD_IOC); }
> > > +
> > >  QIOChannelRDMA *qio_channel_rdma_new(void)  {
> > >      QIOChannelRDMA *rioc;
> > >      QIOChannel *ioc;
> > >
> > > +    qio_channel_rdma_rpoller_start();
> > > +    if (!rpoller.is_running) {
> > > +        return NULL;
> > > +    }
> > > +
> > >      rioc =
> > QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
> > >      ioc = QIO_CHANNEL(rioc);
> > >      qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
> > @@
> > > -125,6 +393,8 @@ retry:
> > >          goto out;
> > >      }
> > >
> > > +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> > > +
> > >  out:
> > >      if (ret) {
> > >          trace_qio_channel_rdma_connect_fail(rioc);
> > > @@ -211,6 +481,8 @@ int
> > qio_channel_rdma_listen_sync(QIOChannelRDMA *rioc, InetSocketAddress
> > *addr,
> > >      qio_channel_set_feature(QIO_CHANNEL(rioc),
> > QIO_CHANNEL_FEATURE_LISTEN);
> > >      trace_qio_channel_rdma_listen_complete(rioc, fd);
> > >
> > > +    qio_channel_rdma_add_rioc_to_rpoller(rioc);
> > > +
> > >  out:
> > >      if (ret) {
> > >          trace_qio_channel_rdma_listen_fail(rioc);
> > > @@ -267,8 +539,10 @@ void
> > qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress
> > *addr,
> > >                             qio_channel_listen_worker_free,
> > context);
> > > }
> > >
> > > -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *rioc,
> > Error
> > > **errp)
> > > +QIOChannelRDMA *coroutine_mixed_fn
> > qio_channel_rdma_accept(QIOChannelRDMA *rioc,
> > > +
> > Error
> > > +**errp)
> > >  {
> > > +    QIOChannel *ioc = QIO_CHANNEL(rioc);
> > >      QIOChannelRDMA *cioc;
> > >
> > >      cioc = qio_channel_rdma_new();
> > > @@ -283,6 +557,17 @@ retry:
> > >          if (errno == EINTR) {
> > >              goto retry;
> > >          }
> > > +        if (errno == EAGAIN) {
> > > +            if (!(rioc->rpoll_events & POLLIN)) {
> > > +                qio_channel_rdma_update_poll_event(rioc,
> > CLEAR_POLLIN, true);
> > > +            }
> > > +            if (qemu_in_coroutine()) {
> > > +                qio_channel_yield(ioc, G_IO_IN);
> > > +            } else {
> > > +                qio_channel_wait(ioc, G_IO_IN);
> > > +            }
> > > +            goto retry;
> > > +        }
> > >          error_setg_errno(errp, errno, "Unable to accept connection");
> > >          goto error;
> > >      }
> > > @@ -294,6 +579,8 @@ retry:
> > >          goto error;
> > >      }
> > >
> > > +    qio_channel_rdma_add_rioc_to_rpoller(cioc);
> > > +
> > >      trace_qio_channel_rdma_accept_complete(rioc, cioc, cioc->fd);
> > >      return cioc;
> > >
> > > @@ -307,6 +594,10 @@ static void qio_channel_rdma_init(Object *obj)  {
> > >      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
> > >      ioc->fd = -1;
> > > +    ioc->pollin_eventfd = -1;
> > > +    ioc->pollout_eventfd = -1;
> > > +    ioc->index = -1;
> > > +    ioc->rpoll_events = 0;
> > >  }
> > >
> > >  static void qio_channel_rdma_finalize(Object *obj) @@ -314,6 +605,7
> > > @@ static void qio_channel_rdma_finalize(Object *obj)
> > >      QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
> > >
> > >      if (ioc->fd != -1) {
> > > +        qio_channel_rdma_notify_rpoller(ioc, RP_CMD_DEL_IOC);
> > >          rclose(ioc->fd);
> > >          ioc->fd = -1;
> > >      }
> > > @@ -330,6 +622,12 @@ static ssize_t
> > qio_channel_rdma_readv(QIOChannel
> > > *ioc, const struct iovec *iov,
> > >  retry:
> > >      ret = rreadv(rioc->fd, iov, niov);
> > >      if (ret < 0) {
> > > +        if (errno == EAGAIN) {
> > > +            if (!(rioc->rpoll_events & POLLIN)) {
> > > +                qio_channel_rdma_update_poll_event(rioc,
> > CLEAR_POLLIN, true);
> > > +            }
> > > +            return QIO_CHANNEL_ERR_BLOCK;
> > > +        }
> > >          if (errno == EINTR) {
> > >              goto retry;
> > >          }
> > > @@ -351,6 +649,12 @@ static ssize_t
> > qio_channel_rdma_writev(QIOChannel
> > > *ioc, const struct iovec *iov,
> > >  retry:
> > >      ret = rwritev(rioc->fd, iov, niov);
> > >      if (ret <= 0) {
> > > +        if (errno == EAGAIN) {
> > > +            if (!(rioc->rpoll_events & POLLOUT)) {
> > > +                qio_channel_rdma_update_poll_event(rioc,
> > CLEAR_POLLOUT, true);
> > > +            }
> > > +            return QIO_CHANNEL_ERR_BLOCK;
> > > +        }
> > >          if (errno == EINTR) {
> > >              goto retry;
> > >          }
> > > @@ -361,6 +665,28 @@ retry:
> > >      return ret;
> > >  }
> > >
> > > +static int qio_channel_rdma_set_blocking(QIOChannel *ioc, bool enabled,
> > > +                                         Error **errp
> > G_GNUC_UNUSED)
> > > +{
> > > +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> > > +    int flags, ret;
> > > +
> > > +    flags = rfcntl(rioc->fd, F_GETFL);
> > > +    if (enabled) {
> > > +        flags &= ~O_NONBLOCK;
> > > +    } else {
> > > +        flags |= O_NONBLOCK;
> > > +    }
> > > +
> > > +    ret = rfcntl(rioc->fd, F_SETFL, flags);
> > > +    if (ret) {
> > > +        error_setg_errno(errp, errno,
> > > +                         "Unable to rfcntl rsocket fd with flags %d",
> > flags);
> > > +    }
> > > +
> > > +    return ret;
> > > +}
> > > +
> > >  static void qio_channel_rdma_set_delay(QIOChannel *ioc, bool enabled)
> > > {
> > >      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc); @@ -374,6
> > +700,7 @@
> > > static int qio_channel_rdma_close(QIOChannel *ioc, Error **errp)
> > >      QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> > >
> > >      if (rioc->fd != -1) {
> > > +        qio_channel_rdma_notify_rpoller(rioc, RP_CMD_DEL_IOC);
> > >          rclose(rioc->fd);
> > >          rioc->fd = -1;
> > >      }
> > > @@ -408,6 +735,37 @@ static int qio_channel_rdma_shutdown(QIOChannel
> > *ioc, QIOChannelShutdown how,
> > >      return 0;
> > >  }
> > >
> > > +static void
> > > +qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, AioContext
> > *read_ctx,
> > > +                                    IOHandler *io_read,
> > AioContext *write_ctx,
> > > +                                    IOHandler *io_write, void
> > > +*opaque) {
> > > +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> > > +
> > > +    qio_channel_util_set_aio_fd_handler(rioc->pollin_eventfd, read_ctx,
> > io_read,
> > > +                                        rioc->pollout_eventfd,
> > write_ctx,
> > > +                                        io_write, opaque); }
> > > +
> > > +static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
> > > +                                              GIOCondition
> > condition)
> > > +{
> > > +    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
> > > +
> > > +    switch (condition) {
> > > +    case G_IO_IN:
> > > +        return qio_channel_create_fd_watch(ioc, rioc->pollin_eventfd,
> > > +                                           condition);
> > > +    case G_IO_OUT:
> > > +        return qio_channel_create_fd_watch(ioc, rioc->pollout_eventfd,
> > > +                                           condition);
> > > +    default:
> > > +        error_report("%s: do not support watch 0x%x event", __func__,
> > > +                     condition);
> > > +        return NULL;
> > > +    }
> > > +}
> > > +
> > >  static void qio_channel_rdma_class_init(ObjectClass *klass,
> > >                                          void *class_data
> > > G_GNUC_UNUSED)  { @@ -415,9 +773,12 @@ static void
> > > qio_channel_rdma_class_init(ObjectClass *klass,
> > >
> > >      ioc_klass->io_writev = qio_channel_rdma_writev;
> > >      ioc_klass->io_readv = qio_channel_rdma_readv;
> > > +    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
> > >      ioc_klass->io_close = qio_channel_rdma_close;
> > >      ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
> > >      ioc_klass->io_set_delay = qio_channel_rdma_set_delay;
> > > +    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
> > > +    ioc_klass->io_set_aio_fd_handler =
> > > + qio_channel_rdma_set_aio_fd_handler;
> > >  }
> > >
> > >  static const TypeInfo qio_channel_rdma_info = {
> > > --
> > > 2.43.0
> > >
> > >
diff mbox series

Patch

diff --git a/include/io/channel-rdma.h b/include/io/channel-rdma.h
index 8cab2459e5..cb56127d76 100644
--- a/include/io/channel-rdma.h
+++ b/include/io/channel-rdma.h
@@ -47,6 +47,18 @@  struct QIOChannelRDMA {
     socklen_t localAddrLen;
     struct sockaddr_storage remoteAddr;
     socklen_t remoteAddrLen;
+
+    /* private */
+
+    /* qemu g_poll/ppoll() POLLIN event on it */
+    int pollin_eventfd;
+    /* qemu g_poll/ppoll() POLLOUT event on it */
+    int pollout_eventfd;
+
+    /* the index in the rpoller's fds array */
+    int index;
+    /* rpoller will rpoll() rpoll_events on the rsocket fd */
+    short int rpoll_events;
 };
 
 /**
@@ -147,6 +159,7 @@  void qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress *addr,
  *
  * Returns: the new client channel, or NULL on error
  */
-QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *ioc, Error **errp);
+QIOChannelRDMA *coroutine_mixed_fn qio_channel_rdma_accept(QIOChannelRDMA *ioc,
+                                                           Error **errp);
 
 #endif /* QIO_CHANNEL_RDMA_H */
diff --git a/io/channel-rdma.c b/io/channel-rdma.c
index 92c362df52..9792add5cf 100644
--- a/io/channel-rdma.c
+++ b/io/channel-rdma.c
@@ -23,10 +23,15 @@ 
 
 #include "qemu/osdep.h"
 #include "io/channel-rdma.h"
+#include "io/channel-util.h"
+#include "io/channel-watch.h"
 #include "io/channel.h"
 #include "qapi/clone-visitor.h"
 #include "qapi/error.h"
 #include "qapi/qapi-visit-sockets.h"
+#include "qemu/atomic.h"
+#include "qemu/error-report.h"
+#include "qemu/thread.h"
 #include "trace.h"
 #include <errno.h>
 #include <netdb.h>
@@ -39,11 +44,274 @@ 
 #include <sys/poll.h>
 #include <unistd.h>
 
+typedef enum {
+    CLEAR_POLLIN,
+    CLEAR_POLLOUT,
+    SET_POLLIN,
+    SET_POLLOUT,
+} UpdateEvent;
+
+typedef enum {
+    RP_CMD_ADD_IOC,
+    RP_CMD_DEL_IOC,
+    RP_CMD_UPDATE,
+} RpollerCMD;
+
+typedef struct {
+    RpollerCMD cmd;
+    QIOChannelRDMA *rioc;
+} RpollerMsg;
+
+/*
+ * rpoll() on the rsocket fd with rpoll_events, when POLLIN/POLLOUT event
+ * occurs, it will write/read the pollin_eventfd/pollout_eventfd to allow
+ * qemu g_poll/ppoll() get the POLLIN/POLLOUT event
+ */
+static struct Rpoller {
+    QemuThread thread;
+    bool is_running;
+    int sock[2];
+    int count; /* the number of rsocket fds being rpoll() */
+    int size; /* the size of fds/riocs */
+    struct pollfd *fds;
+    QIOChannelRDMA **riocs;
+} rpoller;
+
+static void qio_channel_rdma_notify_rpoller(QIOChannelRDMA *rioc,
+                                            RpollerCMD cmd)
+{
+    RpollerMsg msg;
+    int ret;
+
+    msg.cmd = cmd;
+    msg.rioc = rioc;
+
+    ret = RETRY_ON_EINTR(write(rpoller.sock[0], &msg, sizeof msg));
+    if (ret != sizeof msg) {
+        error_report("%s: failed to send msg, errno: %d", __func__, errno);
+    }
+}
+
+static void qio_channel_rdma_update_poll_event(QIOChannelRDMA *rioc,
+                                               UpdateEvent action,
+                                               bool notify_rpoller)
+{
+    /* An eventfd with the value of ULLONG_MAX - 1 is readable but unwritable */
+    unsigned long long buf = ULLONG_MAX - 1;
+
+    switch (action) {
+    /* only rpoller do SET_* action, to allow qemu ppoll() get the event */
+    case SET_POLLIN:
+        RETRY_ON_EINTR(write(rioc->pollin_eventfd, &buf, sizeof buf));
+        rioc->rpoll_events &= ~POLLIN;
+        break;
+    case SET_POLLOUT:
+        RETRY_ON_EINTR(read(rioc->pollout_eventfd, &buf, sizeof buf));
+        rioc->rpoll_events &= ~POLLOUT;
+        break;
+
+    /* the rsocket fd is not ready to rread/rwrite */
+    case CLEAR_POLLIN:
+        RETRY_ON_EINTR(read(rioc->pollin_eventfd, &buf, sizeof buf));
+        rioc->rpoll_events |= POLLIN;
+        break;
+    case CLEAR_POLLOUT:
+        RETRY_ON_EINTR(write(rioc->pollout_eventfd, &buf, sizeof buf));
+        rioc->rpoll_events |= POLLOUT;
+        break;
+    default:
+        break;
+    }
+
+    /* notify rpoller to rpoll() POLLIN/POLLOUT events */
+    if (notify_rpoller) {
+        qio_channel_rdma_notify_rpoller(rioc, RP_CMD_UPDATE);
+    }
+}
+
+static void qio_channel_rdma_rpoller_add_rioc(QIOChannelRDMA *rioc)
+{
+    if (rioc->index != -1) {
+        error_report("%s: rioc already exsits", __func__);
+        return;
+    }
+
+    rioc->index = ++rpoller.count;
+
+    if (rpoller.count + 1 > rpoller.size) {
+        rpoller.size *= 2;
+        rpoller.fds = g_renew(struct pollfd, rpoller.fds, rpoller.size);
+        rpoller.riocs = g_renew(QIOChannelRDMA *, rpoller.riocs, rpoller.size);
+    }
+
+    rpoller.fds[rioc->index].fd = rioc->fd;
+    rpoller.fds[rioc->index].events = rioc->rpoll_events;
+    rpoller.riocs[rioc->index] = rioc;
+}
+
+static void qio_channel_rdma_rpoller_del_rioc(QIOChannelRDMA *rioc)
+{
+    if (rioc->index == -1) {
+        error_report("%s: rioc not exsits", __func__);
+        return;
+    }
+
+    rpoller.fds[rioc->index] = rpoller.fds[rpoller.count];
+    rpoller.riocs[rioc->index] = rpoller.riocs[rpoller.count];
+    rpoller.riocs[rioc->index]->index = rioc->index;
+    rpoller.count--;
+
+    close(rioc->pollin_eventfd);
+    close(rioc->pollout_eventfd);
+    rioc->index = -1;
+    rioc->rpoll_events = 0;
+}
+
+static void qio_channel_rdma_rpoller_update_ioc(QIOChannelRDMA *rioc)
+{
+    if (rioc->index == -1) {
+        error_report("%s: rioc not exsits", __func__);
+        return;
+    }
+
+    rpoller.fds[rioc->index].fd = rioc->fd;
+    rpoller.fds[rioc->index].events = rioc->rpoll_events;
+}
+
+static void qio_channel_rdma_rpoller_process_msg(void)
+{
+    RpollerMsg msg;
+    int ret;
+
+    ret = RETRY_ON_EINTR(read(rpoller.sock[1], &msg, sizeof msg));
+    if (ret != sizeof msg) {
+        error_report("%s: rpoller failed to recv msg: %s", __func__,
+                     strerror(errno));
+        return;
+    }
+
+    switch (msg.cmd) {
+    case RP_CMD_ADD_IOC:
+        qio_channel_rdma_rpoller_add_rioc(msg.rioc);
+        break;
+    case RP_CMD_DEL_IOC:
+        qio_channel_rdma_rpoller_del_rioc(msg.rioc);
+        break;
+    case RP_CMD_UPDATE:
+        qio_channel_rdma_rpoller_update_ioc(msg.rioc);
+        break;
+    default:
+        break;
+    }
+}
+
+static void qio_channel_rdma_rpoller_cleanup(void)
+{
+    close(rpoller.sock[0]);
+    close(rpoller.sock[1]);
+    rpoller.sock[0] = -1;
+    rpoller.sock[1] = -1;
+    g_free(rpoller.fds);
+    g_free(rpoller.riocs);
+    rpoller.fds = NULL;
+    rpoller.riocs = NULL;
+    rpoller.count = 0;
+    rpoller.size = 0;
+    rpoller.is_running = false;
+}
+
+static void *qio_channel_rdma_rpoller_thread(void *opaque)
+{
+    int i, ret, error_events = POLLERR | POLLHUP | POLLNVAL;
+
+    do {
+        ret = rpoll(rpoller.fds, rpoller.count + 1, -1);
+        if (ret < 0 && errno != -EINTR) {
+            error_report("%s: rpoll() error: %s", __func__, strerror(errno));
+            break;
+        }
+
+        for (i = 1; i <= rpoller.count; i++) {
+            if (rpoller.fds[i].revents & (POLLIN | error_events)) {
+                qio_channel_rdma_update_poll_event(rpoller.riocs[i], SET_POLLIN,
+                                                   false);
+                rpoller.fds[i].events &= ~POLLIN;
+            }
+            if (rpoller.fds[i].revents & (POLLOUT | error_events)) {
+                qio_channel_rdma_update_poll_event(rpoller.riocs[i],
+                                                   SET_POLLOUT, false);
+                rpoller.fds[i].events &= ~POLLOUT;
+            }
+            /* ignore this fd */
+            if (rpoller.fds[i].revents & (error_events)) {
+                rpoller.fds[i].fd = -1;
+            }
+        }
+
+        if (rpoller.fds[0].revents) {
+            qio_channel_rdma_rpoller_process_msg();
+        }
+    } while (rpoller.count >= 1);
+
+    qio_channel_rdma_rpoller_cleanup();
+
+    return NULL;
+}
+
+static void qio_channel_rdma_rpoller_start(void)
+{
+    if (qatomic_xchg(&rpoller.is_running, true)) {
+        return;
+    }
+
+    if (qemu_socketpair(AF_UNIX, SOCK_STREAM, 0, rpoller.sock)) {
+        rpoller.is_running = false;
+        error_report("%s: failed to create socketpair %s", __func__,
+                     strerror(errno));
+        return;
+    }
+
+    rpoller.count = 0;
+    rpoller.size = 4;
+    rpoller.fds = g_malloc0_n(rpoller.size, sizeof(struct pollfd));
+    rpoller.riocs = g_malloc0_n(rpoller.size, sizeof(QIOChannelRDMA *));
+    rpoller.fds[0].fd = rpoller.sock[1];
+    rpoller.fds[0].events = POLLIN;
+
+    qemu_thread_create(&rpoller.thread, "qio-channel-rdma-rpoller",
+                       qio_channel_rdma_rpoller_thread, NULL,
+                       QEMU_THREAD_JOINABLE);
+}
+
+static void qio_channel_rdma_add_rioc_to_rpoller(QIOChannelRDMA *rioc)
+{
+    int flags = EFD_CLOEXEC | EFD_NONBLOCK;
+
+    /*
+     * A single eventfd is either readable or writable. A single eventfd cannot
+     * represent a state where it is neither readable nor writable. so use two
+     * eventfds here.
+     */
+    rioc->pollin_eventfd = eventfd(0, flags);
+    rioc->pollout_eventfd = eventfd(0, flags);
+    /* pollout_eventfd with the value 0, means writable, make it unwritable */
+    qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, false);
+
+    /* tell the rpoller to rpoll() events on rioc->socketfd */
+    rioc->rpoll_events = POLLIN | POLLOUT;
+    qio_channel_rdma_notify_rpoller(rioc, RP_CMD_ADD_IOC);
+}
+
 QIOChannelRDMA *qio_channel_rdma_new(void)
 {
     QIOChannelRDMA *rioc;
     QIOChannel *ioc;
 
+    qio_channel_rdma_rpoller_start();
+    if (!rpoller.is_running) {
+        return NULL;
+    }
+
     rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
     ioc = QIO_CHANNEL(rioc);
     qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN);
@@ -125,6 +393,8 @@  retry:
         goto out;
     }
 
+    qio_channel_rdma_add_rioc_to_rpoller(rioc);
+
 out:
     if (ret) {
         trace_qio_channel_rdma_connect_fail(rioc);
@@ -211,6 +481,8 @@  int qio_channel_rdma_listen_sync(QIOChannelRDMA *rioc, InetSocketAddress *addr,
     qio_channel_set_feature(QIO_CHANNEL(rioc), QIO_CHANNEL_FEATURE_LISTEN);
     trace_qio_channel_rdma_listen_complete(rioc, fd);
 
+    qio_channel_rdma_add_rioc_to_rpoller(rioc);
+
 out:
     if (ret) {
         trace_qio_channel_rdma_listen_fail(rioc);
@@ -267,8 +539,10 @@  void qio_channel_rdma_listen_async(QIOChannelRDMA *ioc, InetSocketAddress *addr,
                            qio_channel_listen_worker_free, context);
 }
 
-QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *rioc, Error **errp)
+QIOChannelRDMA *coroutine_mixed_fn qio_channel_rdma_accept(QIOChannelRDMA *rioc,
+                                                           Error **errp)
 {
+    QIOChannel *ioc = QIO_CHANNEL(rioc);
     QIOChannelRDMA *cioc;
 
     cioc = qio_channel_rdma_new();
@@ -283,6 +557,17 @@  retry:
         if (errno == EINTR) {
             goto retry;
         }
+        if (errno == EAGAIN) {
+            if (!(rioc->rpoll_events & POLLIN)) {
+                qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLIN, true);
+            }
+            if (qemu_in_coroutine()) {
+                qio_channel_yield(ioc, G_IO_IN);
+            } else {
+                qio_channel_wait(ioc, G_IO_IN);
+            }
+            goto retry;
+        }
         error_setg_errno(errp, errno, "Unable to accept connection");
         goto error;
     }
@@ -294,6 +579,8 @@  retry:
         goto error;
     }
 
+    qio_channel_rdma_add_rioc_to_rpoller(cioc);
+
     trace_qio_channel_rdma_accept_complete(rioc, cioc, cioc->fd);
     return cioc;
 
@@ -307,6 +594,10 @@  static void qio_channel_rdma_init(Object *obj)
 {
     QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
     ioc->fd = -1;
+    ioc->pollin_eventfd = -1;
+    ioc->pollout_eventfd = -1;
+    ioc->index = -1;
+    ioc->rpoll_events = 0;
 }
 
 static void qio_channel_rdma_finalize(Object *obj)
@@ -314,6 +605,7 @@  static void qio_channel_rdma_finalize(Object *obj)
     QIOChannelRDMA *ioc = QIO_CHANNEL_RDMA(obj);
 
     if (ioc->fd != -1) {
+        qio_channel_rdma_notify_rpoller(ioc, RP_CMD_DEL_IOC);
         rclose(ioc->fd);
         ioc->fd = -1;
     }
@@ -330,6 +622,12 @@  static ssize_t qio_channel_rdma_readv(QIOChannel *ioc, const struct iovec *iov,
 retry:
     ret = rreadv(rioc->fd, iov, niov);
     if (ret < 0) {
+        if (errno == EAGAIN) {
+            if (!(rioc->rpoll_events & POLLIN)) {
+                qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLIN, true);
+            }
+            return QIO_CHANNEL_ERR_BLOCK;
+        }
         if (errno == EINTR) {
             goto retry;
         }
@@ -351,6 +649,12 @@  static ssize_t qio_channel_rdma_writev(QIOChannel *ioc, const struct iovec *iov,
 retry:
     ret = rwritev(rioc->fd, iov, niov);
     if (ret <= 0) {
+        if (errno == EAGAIN) {
+            if (!(rioc->rpoll_events & POLLOUT)) {
+                qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, true);
+            }
+            return QIO_CHANNEL_ERR_BLOCK;
+        }
         if (errno == EINTR) {
             goto retry;
         }
@@ -361,6 +665,28 @@  retry:
     return ret;
 }
 
+static int qio_channel_rdma_set_blocking(QIOChannel *ioc, bool enabled,
+                                         Error **errp G_GNUC_UNUSED)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+    int flags, ret;
+
+    flags = rfcntl(rioc->fd, F_GETFL);
+    if (enabled) {
+        flags &= ~O_NONBLOCK;
+    } else {
+        flags |= O_NONBLOCK;
+    }
+
+    ret = rfcntl(rioc->fd, F_SETFL, flags);
+    if (ret) {
+        error_setg_errno(errp, errno,
+                         "Unable to rfcntl rsocket fd with flags %d", flags);
+    }
+
+    return ret;
+}
+
 static void qio_channel_rdma_set_delay(QIOChannel *ioc, bool enabled)
 {
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
@@ -374,6 +700,7 @@  static int qio_channel_rdma_close(QIOChannel *ioc, Error **errp)
     QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
 
     if (rioc->fd != -1) {
+        qio_channel_rdma_notify_rpoller(rioc, RP_CMD_DEL_IOC);
         rclose(rioc->fd);
         rioc->fd = -1;
     }
@@ -408,6 +735,37 @@  static int qio_channel_rdma_shutdown(QIOChannel *ioc, QIOChannelShutdown how,
     return 0;
 }
 
+static void
+qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, AioContext *read_ctx,
+                                    IOHandler *io_read, AioContext *write_ctx,
+                                    IOHandler *io_write, void *opaque)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+
+    qio_channel_util_set_aio_fd_handler(rioc->pollin_eventfd, read_ctx, io_read,
+                                        rioc->pollout_eventfd, write_ctx,
+                                        io_write, opaque);
+}
+
+static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
+                                              GIOCondition condition)
+{
+    QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+
+    switch (condition) {
+    case G_IO_IN:
+        return qio_channel_create_fd_watch(ioc, rioc->pollin_eventfd,
+                                           condition);
+    case G_IO_OUT:
+        return qio_channel_create_fd_watch(ioc, rioc->pollout_eventfd,
+                                           condition);
+    default:
+        error_report("%s: do not support watch 0x%x event", __func__,
+                     condition);
+        return NULL;
+    }
+}
+
 static void qio_channel_rdma_class_init(ObjectClass *klass,
                                         void *class_data G_GNUC_UNUSED)
 {
@@ -415,9 +773,12 @@  static void qio_channel_rdma_class_init(ObjectClass *klass,
 
     ioc_klass->io_writev = qio_channel_rdma_writev;
     ioc_klass->io_readv = qio_channel_rdma_readv;
+    ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
     ioc_klass->io_close = qio_channel_rdma_close;
     ioc_klass->io_shutdown = qio_channel_rdma_shutdown;
     ioc_klass->io_set_delay = qio_channel_rdma_set_delay;
+    ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
+    ioc_klass->io_set_aio_fd_handler = qio_channel_rdma_set_aio_fd_handler;
 }
 
 static const TypeInfo qio_channel_rdma_info = {