diff mbox series

[V3,4/6] block/rbd: migrate from aio to coroutines

Message ID 20210519142359.23083-5-pl@kamp.de (mailing list archive)
State New, archived
Headers show
Series block/rbd: migrate to coroutines and add write zeroes support | expand

Commit Message

Peter Lieven May 19, 2021, 2:23 p.m. UTC
Signed-off-by: Peter Lieven <pl@kamp.de>
---
 block/rbd.c | 255 ++++++++++++++++++----------------------------------
 1 file changed, 87 insertions(+), 168 deletions(-)

Comments

Ilya Dryomov June 17, 2021, 2:43 p.m. UTC | #1
On Wed, May 19, 2021 at 4:27 PM Peter Lieven <pl@kamp.de> wrote:
>
> Signed-off-by: Peter Lieven <pl@kamp.de>
> ---
>  block/rbd.c | 255 ++++++++++++++++++----------------------------------
>  1 file changed, 87 insertions(+), 168 deletions(-)
>
> diff --git a/block/rbd.c b/block/rbd.c
> index 97a2ae4c84..0d8612a988 100644
> --- a/block/rbd.c
> +++ b/block/rbd.c
> @@ -66,22 +66,6 @@ typedef enum {
>      RBD_AIO_FLUSH
>  } RBDAIOCmd;
>
> -typedef struct RBDAIOCB {
> -    BlockAIOCB common;
> -    int64_t ret;
> -    QEMUIOVector *qiov;
> -    RBDAIOCmd cmd;
> -    int error;
> -    struct BDRVRBDState *s;
> -} RBDAIOCB;
> -
> -typedef struct RADOSCB {
> -    RBDAIOCB *acb;
> -    struct BDRVRBDState *s;
> -    int64_t size;
> -    int64_t ret;
> -} RADOSCB;
> -
>  typedef struct BDRVRBDState {
>      rados_t cluster;
>      rados_ioctx_t io_ctx;
> @@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
>      uint64_t object_size;
>  } BDRVRBDState;
>
> +typedef struct RBDTask {
> +    BlockDriverState *bs;
> +    Coroutine *co;
> +    bool complete;
> +    int64_t ret;
> +} RBDTask;
> +
>  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
>                              BlockdevOptionsRbd *opts, bool cache,
>                              const char *keypairs, const char *secretid,
> @@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
>      return ret;
>  }
>
> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
> -{
> -    RBDAIOCB *acb = rcb->acb;
> -    iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
> -               acb->qiov->size - offs);
> -}
> -
>  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
>  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
>                                const char *keypairs, const char *password_secret,
> @@ -450,46 +434,6 @@ exit:
>      return ret;
>  }
>
> -/*
> - * This aio completion is being called from rbd_finish_bh() and runs in qemu
> - * BH context.
> - */
> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
> -{
> -    RBDAIOCB *acb = rcb->acb;
> -    int64_t r;
> -
> -    r = rcb->ret;
> -
> -    if (acb->cmd != RBD_AIO_READ) {
> -        if (r < 0) {
> -            acb->ret = r;
> -            acb->error = 1;
> -        } else if (!acb->error) {
> -            acb->ret = rcb->size;
> -        }
> -    } else {
> -        if (r < 0) {
> -            qemu_rbd_memset(rcb, 0);
> -            acb->ret = r;
> -            acb->error = 1;
> -        } else if (r < rcb->size) {
> -            qemu_rbd_memset(rcb, r);
> -            if (!acb->error) {
> -                acb->ret = rcb->size;
> -            }
> -        } else if (!acb->error) {
> -            acb->ret = r;
> -        }
> -    }
> -
> -    g_free(rcb);
> -
> -    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> -
> -    qemu_aio_unref(acb);
> -}
> -
>  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
>  {
>      const char **vals;
> @@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
>      return 0;
>  }
>
> -static const AIOCBInfo rbd_aiocb_info = {
> -    .aiocb_size = sizeof(RBDAIOCB),
> -};
> -
> -static void rbd_finish_bh(void *opaque)
> +static void qemu_rbd_finish_bh(void *opaque)
>  {
> -    RADOSCB *rcb = opaque;
> -    qemu_rbd_complete_aio(rcb);
> +    RBDTask *task = opaque;
> +    task->complete = 1;
> +    aio_co_wake(task->co);
>  }
>
> -/*
> - * This is the callback function for rbd_aio_read and _write
> - *
> - * Note: this function is being called from a non qemu thread so
> - * we need to be careful about what we do here. Generally we only
> - * schedule a BH, and do the rest of the io completion handling
> - * from rbd_finish_bh() which runs in a qemu context.
> - */

I would adapt this comment instead of removing it.  I mean, it is
still true and the reason for going through aio_bh_schedule_oneshot()
instead of calling aio_co_wake() directly, right?

> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
>  {
> -    RBDAIOCB *acb = rcb->acb;
> -
> -    rcb->ret = rbd_aio_get_return_value(c);
> +    task->ret = rbd_aio_get_return_value(c);
>      rbd_aio_release(c);
> -
> -    replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
> -                                     rbd_finish_bh, rcb);
> +    aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
> +                            qemu_rbd_finish_bh, task);
>  }
>
> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
> -                                 int64_t off,
> -                                 QEMUIOVector *qiov,
> -                                 int64_t size,
> -                                 BlockCompletionFunc *cb,
> -                                 void *opaque,
> -                                 RBDAIOCmd cmd)
> +static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
> +                                          uint64_t offset,
> +                                          uint64_t bytes,
> +                                          QEMUIOVector *qiov,
> +                                          int flags,
> +                                          RBDAIOCmd cmd)
>  {
> -    RBDAIOCB *acb;
> -    RADOSCB *rcb = NULL;
> +    BDRVRBDState *s = bs->opaque;
> +    RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
>      rbd_completion_t c;
>      int r;
>
> -    BDRVRBDState *s = bs->opaque;
> +    assert(!qiov || qiov->size == bytes);
>
> -    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
> -    acb->cmd = cmd;
> -    acb->qiov = qiov;
> -    assert(!qiov || qiov->size == size);
> -
> -    rcb = g_new(RADOSCB, 1);
> -
> -    acb->ret = 0;
> -    acb->error = 0;
> -    acb->s = s;
> -
> -    rcb->acb = acb;
> -    rcb->s = acb->s;
> -    rcb->size = size;
> -    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
> +    r = rbd_aio_create_completion(&task,
> +                                  (rbd_callback_t) qemu_rbd_completion_cb, &c);
>      if (r < 0) {
> -        goto failed;
> +        return r;
>      }
>
>      switch (cmd) {
> -    case RBD_AIO_WRITE:
> -        /*
> -         * RBD APIs don't allow us to write more than actual size, so in order
> -         * to support growing images, we resize the image before write
> -         * operations that exceed the current size.
> -         */
> -        if (off + size > s->image_size) {
> -            r = qemu_rbd_resize(bs, off + size);
> -            if (r < 0) {
> -                goto failed_completion;
> -            }
> -        }
> -        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
> -        break;
>      case RBD_AIO_READ:
> -        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
> +        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
> +        break;
> +    case RBD_AIO_WRITE:
> +        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
>          break;
>      case RBD_AIO_DISCARD:
> -        r = rbd_aio_discard(s->image, off, size, c);
> +        r = rbd_aio_discard(s->image, offset, bytes, c);
>          break;
>      case RBD_AIO_FLUSH:
>          r = rbd_aio_flush(s->image, c);
> @@ -918,44 +823,69 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
>      }
>
>      if (r < 0) {
> -        goto failed_completion;
> +        error_report("rbd request failed early: cmd %d offset %" PRIu64
> +                     " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
> +                     bytes, flags, r, strerror(-r));
> +        rbd_aio_release(c);
> +        return r;
>      }
> -    return &acb->common;
>
> -failed_completion:
> -    rbd_aio_release(c);
> -failed:
> -    g_free(rcb);
> +    while (!task.complete) {
> +        qemu_coroutine_yield();
> +    }
>
> -    qemu_aio_unref(acb);
> -    return NULL;
> +    if (task.ret < 0) {
> +        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
> +                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
> +                     bytes, flags, task.ret, strerror(-task.ret));
> +        return task.ret;
> +    }
> +
> +    /* zero pad short reads */
> +    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
> +        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);

I would use bytes instead of qiov->size here and on the previous
line.  In many systems the iovec can be larger than the op and it
took me a couple of minutes to spot the "qiov->size == bytes" assert.

Also, previously we zeroed the entire op on read errors.  I don't think
it matters but want to make sure this was dropped on purpose.

> +    }
> +
> +    return 0;
> +}
> +
> +static int
> +coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
> +                               uint64_t bytes, QEMUIOVector *qiov,
> +                               int flags)
> +{
> +    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
>  }
>
> -static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
> -                                       uint64_t offset, uint64_t bytes,
> -                                       QEMUIOVector *qiov, int flags,
> -                                       BlockCompletionFunc *cb,
> -                                       void *opaque)
> +static int
> +coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
> +                                 uint64_t bytes, QEMUIOVector *qiov,
> +                                 int flags)
>  {
> -    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
> -                         RBD_AIO_READ);
> +    BDRVRBDState *s = bs->opaque;
> +    /*
> +     * RBD APIs don't allow us to write more than actual size, so in order
> +     * to support growing images, we resize the image before write
> +     * operations that exceed the current size.
> +     */
> +    if (offset + bytes > s->image_size) {
> +        int r = qemu_rbd_resize(bs, offset + bytes);
> +        if (r < 0) {
> +            return r;
> +        }
> +    }

How can this be triggered today?  qemu-io used to be able to, but that
support was removed a long time ago:

    https://mail.gnu.org/archive/html/qemu-devel/2014-12/msg01592.html

Just checking that this piece of code is not vestigial at this point.

Thanks,

                Ilya
Peter Lieven June 18, 2021, 9:07 a.m. UTC | #2
Am 17.06.21 um 16:43 schrieb Ilya Dryomov:
> On Wed, May 19, 2021 at 4:27 PM Peter Lieven <pl@kamp.de> wrote:
>> Signed-off-by: Peter Lieven <pl@kamp.de>
>> ---
>>  block/rbd.c | 255 ++++++++++++++++++----------------------------------
>>  1 file changed, 87 insertions(+), 168 deletions(-)
>>
>> diff --git a/block/rbd.c b/block/rbd.c
>> index 97a2ae4c84..0d8612a988 100644
>> --- a/block/rbd.c
>> +++ b/block/rbd.c
>> @@ -66,22 +66,6 @@ typedef enum {
>>      RBD_AIO_FLUSH
>>  } RBDAIOCmd;
>>
>> -typedef struct RBDAIOCB {
>> -    BlockAIOCB common;
>> -    int64_t ret;
>> -    QEMUIOVector *qiov;
>> -    RBDAIOCmd cmd;
>> -    int error;
>> -    struct BDRVRBDState *s;
>> -} RBDAIOCB;
>> -
>> -typedef struct RADOSCB {
>> -    RBDAIOCB *acb;
>> -    struct BDRVRBDState *s;
>> -    int64_t size;
>> -    int64_t ret;
>> -} RADOSCB;
>> -
>>  typedef struct BDRVRBDState {
>>      rados_t cluster;
>>      rados_ioctx_t io_ctx;
>> @@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
>>      uint64_t object_size;
>>  } BDRVRBDState;
>>
>> +typedef struct RBDTask {
>> +    BlockDriverState *bs;
>> +    Coroutine *co;
>> +    bool complete;
>> +    int64_t ret;
>> +} RBDTask;
>> +
>>  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
>>                              BlockdevOptionsRbd *opts, bool cache,
>>                              const char *keypairs, const char *secretid,
>> @@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
>>      return ret;
>>  }
>>
>> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
>> -{
>> -    RBDAIOCB *acb = rcb->acb;
>> -    iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
>> -               acb->qiov->size - offs);
>> -}
>> -
>>  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
>>  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
>>                                const char *keypairs, const char *password_secret,
>> @@ -450,46 +434,6 @@ exit:
>>      return ret;
>>  }
>>
>> -/*
>> - * This aio completion is being called from rbd_finish_bh() and runs in qemu
>> - * BH context.
>> - */
>> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
>> -{
>> -    RBDAIOCB *acb = rcb->acb;
>> -    int64_t r;
>> -
>> -    r = rcb->ret;
>> -
>> -    if (acb->cmd != RBD_AIO_READ) {
>> -        if (r < 0) {
>> -            acb->ret = r;
>> -            acb->error = 1;
>> -        } else if (!acb->error) {
>> -            acb->ret = rcb->size;
>> -        }
>> -    } else {
>> -        if (r < 0) {
>> -            qemu_rbd_memset(rcb, 0);
>> -            acb->ret = r;
>> -            acb->error = 1;
>> -        } else if (r < rcb->size) {
>> -            qemu_rbd_memset(rcb, r);
>> -            if (!acb->error) {
>> -                acb->ret = rcb->size;
>> -            }
>> -        } else if (!acb->error) {
>> -            acb->ret = r;
>> -        }
>> -    }
>> -
>> -    g_free(rcb);
>> -
>> -    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
>> -
>> -    qemu_aio_unref(acb);
>> -}
>> -
>>  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
>>  {
>>      const char **vals;
>> @@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
>>      return 0;
>>  }
>>
>> -static const AIOCBInfo rbd_aiocb_info = {
>> -    .aiocb_size = sizeof(RBDAIOCB),
>> -};
>> -
>> -static void rbd_finish_bh(void *opaque)
>> +static void qemu_rbd_finish_bh(void *opaque)
>>  {
>> -    RADOSCB *rcb = opaque;
>> -    qemu_rbd_complete_aio(rcb);
>> +    RBDTask *task = opaque;
>> +    task->complete = 1;
>> +    aio_co_wake(task->co);
>>  }
>>
>> -/*
>> - * This is the callback function for rbd_aio_read and _write
>> - *
>> - * Note: this function is being called from a non qemu thread so
>> - * we need to be careful about what we do here. Generally we only
>> - * schedule a BH, and do the rest of the io completion handling
>> - * from rbd_finish_bh() which runs in a qemu context.
>> - */
> I would adapt this comment instead of removing it.  I mean, it is
> still true and the reason for going through aio_bh_schedule_oneshot()
> instead of calling aio_co_wake() directly, right?


Sure, its still right, but I think rbd is the only driver with this comment.

I can leave it in, no problem.


>
>> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
>> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
>>  {
>> -    RBDAIOCB *acb = rcb->acb;
>> -
>> -    rcb->ret = rbd_aio_get_return_value(c);
>> +    task->ret = rbd_aio_get_return_value(c);
>>      rbd_aio_release(c);
>> -
>> -    replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
>> -                                     rbd_finish_bh, rcb);
>> +    aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
>> +                            qemu_rbd_finish_bh, task);
>>  }
>>
>> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
>> -                                 int64_t off,
>> -                                 QEMUIOVector *qiov,
>> -                                 int64_t size,
>> -                                 BlockCompletionFunc *cb,
>> -                                 void *opaque,
>> -                                 RBDAIOCmd cmd)
>> +static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
>> +                                          uint64_t offset,
>> +                                          uint64_t bytes,
>> +                                          QEMUIOVector *qiov,
>> +                                          int flags,
>> +                                          RBDAIOCmd cmd)
>>  {
>> -    RBDAIOCB *acb;
>> -    RADOSCB *rcb = NULL;
>> +    BDRVRBDState *s = bs->opaque;
>> +    RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
>>      rbd_completion_t c;
>>      int r;
>>
>> -    BDRVRBDState *s = bs->opaque;
>> +    assert(!qiov || qiov->size == bytes);
>>
>> -    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
>> -    acb->cmd = cmd;
>> -    acb->qiov = qiov;
>> -    assert(!qiov || qiov->size == size);
>> -
>> -    rcb = g_new(RADOSCB, 1);
>> -
>> -    acb->ret = 0;
>> -    acb->error = 0;
>> -    acb->s = s;
>> -
>> -    rcb->acb = acb;
>> -    rcb->s = acb->s;
>> -    rcb->size = size;
>> -    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
>> +    r = rbd_aio_create_completion(&task,
>> +                                  (rbd_callback_t) qemu_rbd_completion_cb, &c);
>>      if (r < 0) {
>> -        goto failed;
>> +        return r;
>>      }
>>
>>      switch (cmd) {
>> -    case RBD_AIO_WRITE:
>> -        /*
>> -         * RBD APIs don't allow us to write more than actual size, so in order
>> -         * to support growing images, we resize the image before write
>> -         * operations that exceed the current size.
>> -         */
>> -        if (off + size > s->image_size) {
>> -            r = qemu_rbd_resize(bs, off + size);
>> -            if (r < 0) {
>> -                goto failed_completion;
>> -            }
>> -        }
>> -        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
>> -        break;
>>      case RBD_AIO_READ:
>> -        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
>> +        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
>> +        break;
>> +    case RBD_AIO_WRITE:
>> +        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
>>          break;
>>      case RBD_AIO_DISCARD:
>> -        r = rbd_aio_discard(s->image, off, size, c);
>> +        r = rbd_aio_discard(s->image, offset, bytes, c);
>>          break;
>>      case RBD_AIO_FLUSH:
>>          r = rbd_aio_flush(s->image, c);
>> @@ -918,44 +823,69 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
>>      }
>>
>>      if (r < 0) {
>> -        goto failed_completion;
>> +        error_report("rbd request failed early: cmd %d offset %" PRIu64
>> +                     " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
>> +                     bytes, flags, r, strerror(-r));
>> +        rbd_aio_release(c);
>> +        return r;
>>      }
>> -    return &acb->common;
>>
>> -failed_completion:
>> -    rbd_aio_release(c);
>> -failed:
>> -    g_free(rcb);
>> +    while (!task.complete) {
>> +        qemu_coroutine_yield();
>> +    }
>>
>> -    qemu_aio_unref(acb);
>> -    return NULL;
>> +    if (task.ret < 0) {
>> +        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
>> +                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
>> +                     bytes, flags, task.ret, strerror(-task.ret));
>> +        return task.ret;
>> +    }
>> +
>> +    /* zero pad short reads */
>> +    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
>> +        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
> I would use bytes instead of qiov->size here and on the previous
> line.  In many systems the iovec can be larger than the op and it
> took me a couple of minutes to spot the "qiov->size == bytes" assert.


qemu_iovec_memset is an operation on the qiov. If one missed

the assertion one could argue that we memset beyond the size of the qiov

if we pass bytes - task.ret as length.


Is it true that there a systems which pass a bigger iovec than the actual op?

In this case the assertion would trigger there.


>
> Also, previously we zeroed the entire op on read errors.  I don't think
> it matters but want to make sure this was dropped on purpose.


I dropped it because its not done in other drivers. If you feel that we could reveal sensitive information I can put the wiping back in.


>
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static int
>> +coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
>> +                               uint64_t bytes, QEMUIOVector *qiov,
>> +                               int flags)
>> +{
>> +    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
>>  }
>>
>> -static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
>> -                                       uint64_t offset, uint64_t bytes,
>> -                                       QEMUIOVector *qiov, int flags,
>> -                                       BlockCompletionFunc *cb,
>> -                                       void *opaque)
>> +static int
>> +coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
>> +                                 uint64_t bytes, QEMUIOVector *qiov,
>> +                                 int flags)
>>  {
>> -    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
>> -                         RBD_AIO_READ);
>> +    BDRVRBDState *s = bs->opaque;
>> +    /*
>> +     * RBD APIs don't allow us to write more than actual size, so in order
>> +     * to support growing images, we resize the image before write
>> +     * operations that exceed the current size.
>> +     */
>> +    if (offset + bytes > s->image_size) {
>> +        int r = qemu_rbd_resize(bs, offset + bytes);
>> +        if (r < 0) {
>> +            return r;
>> +        }
>> +    }
> How can this be triggered today?  qemu-io used to be able to, but that
> support was removed a long time ago:
>
>     https://mail.gnu.org/archive/html/qemu-devel/2014-12/msg01592.html
>
> Just checking that this piece of code is not vestigial at this point.


We had this discussion before. The way to hit this if you want to create e.g. a qcow2 image on rbd. It does not make that much sense, but it was supported in the past.

I would vote for removing it, but it might break something somewhere.


Thanks,

Peter
Ilya Dryomov June 18, 2021, 11:26 a.m. UTC | #3
On Fri, Jun 18, 2021 at 11:07 AM Peter Lieven <pl@kamp.de> wrote:
>
> Am 17.06.21 um 16:43 schrieb Ilya Dryomov:
> > On Wed, May 19, 2021 at 4:27 PM Peter Lieven <pl@kamp.de> wrote:
> >> Signed-off-by: Peter Lieven <pl@kamp.de>
> >> ---
> >>  block/rbd.c | 255 ++++++++++++++++++----------------------------------
> >>  1 file changed, 87 insertions(+), 168 deletions(-)
> >>
> >> diff --git a/block/rbd.c b/block/rbd.c
> >> index 97a2ae4c84..0d8612a988 100644
> >> --- a/block/rbd.c
> >> +++ b/block/rbd.c
> >> @@ -66,22 +66,6 @@ typedef enum {
> >>      RBD_AIO_FLUSH
> >>  } RBDAIOCmd;
> >>
> >> -typedef struct RBDAIOCB {
> >> -    BlockAIOCB common;
> >> -    int64_t ret;
> >> -    QEMUIOVector *qiov;
> >> -    RBDAIOCmd cmd;
> >> -    int error;
> >> -    struct BDRVRBDState *s;
> >> -} RBDAIOCB;
> >> -
> >> -typedef struct RADOSCB {
> >> -    RBDAIOCB *acb;
> >> -    struct BDRVRBDState *s;
> >> -    int64_t size;
> >> -    int64_t ret;
> >> -} RADOSCB;
> >> -
> >>  typedef struct BDRVRBDState {
> >>      rados_t cluster;
> >>      rados_ioctx_t io_ctx;
> >> @@ -93,6 +77,13 @@ typedef struct BDRVRBDState {
> >>      uint64_t object_size;
> >>  } BDRVRBDState;
> >>
> >> +typedef struct RBDTask {
> >> +    BlockDriverState *bs;
> >> +    Coroutine *co;
> >> +    bool complete;
> >> +    int64_t ret;
> >> +} RBDTask;
> >> +
> >>  static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
> >>                              BlockdevOptionsRbd *opts, bool cache,
> >>                              const char *keypairs, const char *secretid,
> >> @@ -325,13 +316,6 @@ static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
> >>      return ret;
> >>  }
> >>
> >> -static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
> >> -{
> >> -    RBDAIOCB *acb = rcb->acb;
> >> -    iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
> >> -               acb->qiov->size - offs);
> >> -}
> >> -
> >>  /* FIXME Deprecate and remove keypairs or make it available in QMP. */
> >>  static int qemu_rbd_do_create(BlockdevCreateOptions *options,
> >>                                const char *keypairs, const char *password_secret,
> >> @@ -450,46 +434,6 @@ exit:
> >>      return ret;
> >>  }
> >>
> >> -/*
> >> - * This aio completion is being called from rbd_finish_bh() and runs in qemu
> >> - * BH context.
> >> - */
> >> -static void qemu_rbd_complete_aio(RADOSCB *rcb)
> >> -{
> >> -    RBDAIOCB *acb = rcb->acb;
> >> -    int64_t r;
> >> -
> >> -    r = rcb->ret;
> >> -
> >> -    if (acb->cmd != RBD_AIO_READ) {
> >> -        if (r < 0) {
> >> -            acb->ret = r;
> >> -            acb->error = 1;
> >> -        } else if (!acb->error) {
> >> -            acb->ret = rcb->size;
> >> -        }
> >> -    } else {
> >> -        if (r < 0) {
> >> -            qemu_rbd_memset(rcb, 0);
> >> -            acb->ret = r;
> >> -            acb->error = 1;
> >> -        } else if (r < rcb->size) {
> >> -            qemu_rbd_memset(rcb, r);
> >> -            if (!acb->error) {
> >> -                acb->ret = rcb->size;
> >> -            }
> >> -        } else if (!acb->error) {
> >> -            acb->ret = r;
> >> -        }
> >> -    }
> >> -
> >> -    g_free(rcb);
> >> -
> >> -    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
> >> -
> >> -    qemu_aio_unref(acb);
> >> -}
> >> -
> >>  static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
> >>  {
> >>      const char **vals;
> >> @@ -826,89 +770,50 @@ static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
> >>      return 0;
> >>  }
> >>
> >> -static const AIOCBInfo rbd_aiocb_info = {
> >> -    .aiocb_size = sizeof(RBDAIOCB),
> >> -};
> >> -
> >> -static void rbd_finish_bh(void *opaque)
> >> +static void qemu_rbd_finish_bh(void *opaque)
> >>  {
> >> -    RADOSCB *rcb = opaque;
> >> -    qemu_rbd_complete_aio(rcb);
> >> +    RBDTask *task = opaque;
> >> +    task->complete = 1;
> >> +    aio_co_wake(task->co);
> >>  }
> >>
> >> -/*
> >> - * This is the callback function for rbd_aio_read and _write
> >> - *
> >> - * Note: this function is being called from a non qemu thread so
> >> - * we need to be careful about what we do here. Generally we only
> >> - * schedule a BH, and do the rest of the io completion handling
> >> - * from rbd_finish_bh() which runs in a qemu context.
> >> - */
> > I would adapt this comment instead of removing it.  I mean, it is
> > still true and the reason for going through aio_bh_schedule_oneshot()
> > instead of calling aio_co_wake() directly, right?
>
>
> Sure, its still right, but I think rbd is the only driver with this comment.
>
> I can leave it in, no problem.

Yeah, just massage it a bit to reflect the reality (at least the
function name).

>
>
> >
> >> -static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
> >> +static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
> >>  {
> >> -    RBDAIOCB *acb = rcb->acb;
> >> -
> >> -    rcb->ret = rbd_aio_get_return_value(c);
> >> +    task->ret = rbd_aio_get_return_value(c);
> >>      rbd_aio_release(c);
> >> -
> >> -    replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
> >> -                                     rbd_finish_bh, rcb);
> >> +    aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
> >> +                            qemu_rbd_finish_bh, task);
> >>  }
> >>
> >> -static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
> >> -                                 int64_t off,
> >> -                                 QEMUIOVector *qiov,
> >> -                                 int64_t size,
> >> -                                 BlockCompletionFunc *cb,
> >> -                                 void *opaque,
> >> -                                 RBDAIOCmd cmd)
> >> +static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
> >> +                                          uint64_t offset,
> >> +                                          uint64_t bytes,
> >> +                                          QEMUIOVector *qiov,
> >> +                                          int flags,
> >> +                                          RBDAIOCmd cmd)
> >>  {
> >> -    RBDAIOCB *acb;
> >> -    RADOSCB *rcb = NULL;
> >> +    BDRVRBDState *s = bs->opaque;
> >> +    RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
> >>      rbd_completion_t c;
> >>      int r;
> >>
> >> -    BDRVRBDState *s = bs->opaque;
> >> +    assert(!qiov || qiov->size == bytes);
> >>
> >> -    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
> >> -    acb->cmd = cmd;
> >> -    acb->qiov = qiov;
> >> -    assert(!qiov || qiov->size == size);
> >> -
> >> -    rcb = g_new(RADOSCB, 1);
> >> -
> >> -    acb->ret = 0;
> >> -    acb->error = 0;
> >> -    acb->s = s;
> >> -
> >> -    rcb->acb = acb;
> >> -    rcb->s = acb->s;
> >> -    rcb->size = size;
> >> -    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
> >> +    r = rbd_aio_create_completion(&task,
> >> +                                  (rbd_callback_t) qemu_rbd_completion_cb, &c);
> >>      if (r < 0) {
> >> -        goto failed;
> >> +        return r;
> >>      }
> >>
> >>      switch (cmd) {
> >> -    case RBD_AIO_WRITE:
> >> -        /*
> >> -         * RBD APIs don't allow us to write more than actual size, so in order
> >> -         * to support growing images, we resize the image before write
> >> -         * operations that exceed the current size.
> >> -         */
> >> -        if (off + size > s->image_size) {
> >> -            r = qemu_rbd_resize(bs, off + size);
> >> -            if (r < 0) {
> >> -                goto failed_completion;
> >> -            }
> >> -        }
> >> -        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
> >> -        break;
> >>      case RBD_AIO_READ:
> >> -        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
> >> +        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
> >> +        break;
> >> +    case RBD_AIO_WRITE:
> >> +        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
> >>          break;
> >>      case RBD_AIO_DISCARD:
> >> -        r = rbd_aio_discard(s->image, off, size, c);
> >> +        r = rbd_aio_discard(s->image, offset, bytes, c);
> >>          break;
> >>      case RBD_AIO_FLUSH:
> >>          r = rbd_aio_flush(s->image, c);
> >> @@ -918,44 +823,69 @@ static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
> >>      }
> >>
> >>      if (r < 0) {
> >> -        goto failed_completion;
> >> +        error_report("rbd request failed early: cmd %d offset %" PRIu64
> >> +                     " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
> >> +                     bytes, flags, r, strerror(-r));
> >> +        rbd_aio_release(c);
> >> +        return r;
> >>      }
> >> -    return &acb->common;
> >>
> >> -failed_completion:
> >> -    rbd_aio_release(c);
> >> -failed:
> >> -    g_free(rcb);
> >> +    while (!task.complete) {
> >> +        qemu_coroutine_yield();
> >> +    }
> >>
> >> -    qemu_aio_unref(acb);
> >> -    return NULL;
> >> +    if (task.ret < 0) {
> >> +        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
> >> +                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
> >> +                     bytes, flags, task.ret, strerror(-task.ret));
> >> +        return task.ret;
> >> +    }
> >> +
> >> +    /* zero pad short reads */
> >> +    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
> >> +        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
> > I would use bytes instead of qiov->size here and on the previous
> > line.  In many systems the iovec can be larger than the op and it
> > took me a couple of minutes to spot the "qiov->size == bytes" assert.
>
>
> qemu_iovec_memset is an operation on the qiov. If one missed
>
> the assertion one could argue that we memset beyond the size of the qiov
>
> if we pass bytes - task.ret as length.

Sorry, disregard -- I missed that rbd_aio_readv/writev() are specified
in terms of iovecs (i.e. iovec base + iovec count instead of iovec base +
number of bytes).

>
>
> Is it true that there a systems which pass a bigger iovec than the actual op?

Perhaps not in QEMU but it comes in many other places.  For example
in the kernel when the block device driver gets a "bio" which it doesn't
own and should act only on a small piece of it, etc.

>
> In this case the assertion would trigger there.
>
>
> >
> > Also, previously we zeroed the entire op on read errors.  I don't think
> > it matters but want to make sure this was dropped on purpose.
>
>
> I dropped it because its not done in other drivers. If you feel that we could reveal sensitive information I can put the wiping back in.

I don't think so and we don't do this in kernel rbd either.

>
>
> >
> >> +    }
> >> +
> >> +    return 0;
> >> +}
> >> +
> >> +static int
> >> +coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
> >> +                               uint64_t bytes, QEMUIOVector *qiov,
> >> +                               int flags)
> >> +{
> >> +    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
> >>  }
> >>
> >> -static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
> >> -                                       uint64_t offset, uint64_t bytes,
> >> -                                       QEMUIOVector *qiov, int flags,
> >> -                                       BlockCompletionFunc *cb,
> >> -                                       void *opaque)
> >> +static int
> >> +coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
> >> +                                 uint64_t bytes, QEMUIOVector *qiov,
> >> +                                 int flags)
> >>  {
> >> -    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
> >> -                         RBD_AIO_READ);
> >> +    BDRVRBDState *s = bs->opaque;
> >> +    /*
> >> +     * RBD APIs don't allow us to write more than actual size, so in order
> >> +     * to support growing images, we resize the image before write
> >> +     * operations that exceed the current size.
> >> +     */
> >> +    if (offset + bytes > s->image_size) {
> >> +        int r = qemu_rbd_resize(bs, offset + bytes);
> >> +        if (r < 0) {
> >> +            return r;
> >> +        }
> >> +    }
> > How can this be triggered today?  qemu-io used to be able to, but that
> > support was removed a long time ago:
> >
> >     https://mail.gnu.org/archive/html/qemu-devel/2014-12/msg01592.html
> >
> > Just checking that this piece of code is not vestigial at this point.
>
>
> We had this discussion before. The way to hit this if you want to create e.g. a qcow2 image on rbd. It does not make that much sense, but it was supported in the past.

Ah, so something like

    qemu-img create -f qcow2 rbd:foo/bar 10G

>
> I would vote for removing it, but it might break something somewhere.

Yeah, it's weird but definitely not up for removal in a patch that
migrates from one internal API to another.

Thanks,

                Ilya
diff mbox series

Patch

diff --git a/block/rbd.c b/block/rbd.c
index 97a2ae4c84..0d8612a988 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -66,22 +66,6 @@  typedef enum {
     RBD_AIO_FLUSH
 } RBDAIOCmd;
 
-typedef struct RBDAIOCB {
-    BlockAIOCB common;
-    int64_t ret;
-    QEMUIOVector *qiov;
-    RBDAIOCmd cmd;
-    int error;
-    struct BDRVRBDState *s;
-} RBDAIOCB;
-
-typedef struct RADOSCB {
-    RBDAIOCB *acb;
-    struct BDRVRBDState *s;
-    int64_t size;
-    int64_t ret;
-} RADOSCB;
-
 typedef struct BDRVRBDState {
     rados_t cluster;
     rados_ioctx_t io_ctx;
@@ -93,6 +77,13 @@  typedef struct BDRVRBDState {
     uint64_t object_size;
 } BDRVRBDState;
 
+typedef struct RBDTask {
+    BlockDriverState *bs;
+    Coroutine *co;
+    bool complete;
+    int64_t ret;
+} RBDTask;
+
 static int qemu_rbd_connect(rados_t *cluster, rados_ioctx_t *io_ctx,
                             BlockdevOptionsRbd *opts, bool cache,
                             const char *keypairs, const char *secretid,
@@ -325,13 +316,6 @@  static int qemu_rbd_set_keypairs(rados_t cluster, const char *keypairs_json,
     return ret;
 }
 
-static void qemu_rbd_memset(RADOSCB *rcb, int64_t offs)
-{
-    RBDAIOCB *acb = rcb->acb;
-    iov_memset(acb->qiov->iov, acb->qiov->niov, offs, 0,
-               acb->qiov->size - offs);
-}
-
 /* FIXME Deprecate and remove keypairs or make it available in QMP. */
 static int qemu_rbd_do_create(BlockdevCreateOptions *options,
                               const char *keypairs, const char *password_secret,
@@ -450,46 +434,6 @@  exit:
     return ret;
 }
 
-/*
- * This aio completion is being called from rbd_finish_bh() and runs in qemu
- * BH context.
- */
-static void qemu_rbd_complete_aio(RADOSCB *rcb)
-{
-    RBDAIOCB *acb = rcb->acb;
-    int64_t r;
-
-    r = rcb->ret;
-
-    if (acb->cmd != RBD_AIO_READ) {
-        if (r < 0) {
-            acb->ret = r;
-            acb->error = 1;
-        } else if (!acb->error) {
-            acb->ret = rcb->size;
-        }
-    } else {
-        if (r < 0) {
-            qemu_rbd_memset(rcb, 0);
-            acb->ret = r;
-            acb->error = 1;
-        } else if (r < rcb->size) {
-            qemu_rbd_memset(rcb, r);
-            if (!acb->error) {
-                acb->ret = rcb->size;
-            }
-        } else if (!acb->error) {
-            acb->ret = r;
-        }
-    }
-
-    g_free(rcb);
-
-    acb->common.cb(acb->common.opaque, (acb->ret > 0 ? 0 : acb->ret));
-
-    qemu_aio_unref(acb);
-}
-
 static char *qemu_rbd_mon_host(BlockdevOptionsRbd *opts, Error **errp)
 {
     const char **vals;
@@ -826,89 +770,50 @@  static int qemu_rbd_resize(BlockDriverState *bs, uint64_t size)
     return 0;
 }
 
-static const AIOCBInfo rbd_aiocb_info = {
-    .aiocb_size = sizeof(RBDAIOCB),
-};
-
-static void rbd_finish_bh(void *opaque)
+static void qemu_rbd_finish_bh(void *opaque)
 {
-    RADOSCB *rcb = opaque;
-    qemu_rbd_complete_aio(rcb);
+    RBDTask *task = opaque;
+    task->complete = 1;
+    aio_co_wake(task->co);
 }
 
-/*
- * This is the callback function for rbd_aio_read and _write
- *
- * Note: this function is being called from a non qemu thread so
- * we need to be careful about what we do here. Generally we only
- * schedule a BH, and do the rest of the io completion handling
- * from rbd_finish_bh() which runs in a qemu context.
- */
-static void rbd_finish_aiocb(rbd_completion_t c, RADOSCB *rcb)
+static void qemu_rbd_completion_cb(rbd_completion_t c, RBDTask *task)
 {
-    RBDAIOCB *acb = rcb->acb;
-
-    rcb->ret = rbd_aio_get_return_value(c);
+    task->ret = rbd_aio_get_return_value(c);
     rbd_aio_release(c);
-
-    replay_bh_schedule_oneshot_event(bdrv_get_aio_context(acb->common.bs),
-                                     rbd_finish_bh, rcb);
+    aio_bh_schedule_oneshot(bdrv_get_aio_context(task->bs),
+                            qemu_rbd_finish_bh, task);
 }
 
-static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
-                                 int64_t off,
-                                 QEMUIOVector *qiov,
-                                 int64_t size,
-                                 BlockCompletionFunc *cb,
-                                 void *opaque,
-                                 RBDAIOCmd cmd)
+static int coroutine_fn qemu_rbd_start_co(BlockDriverState *bs,
+                                          uint64_t offset,
+                                          uint64_t bytes,
+                                          QEMUIOVector *qiov,
+                                          int flags,
+                                          RBDAIOCmd cmd)
 {
-    RBDAIOCB *acb;
-    RADOSCB *rcb = NULL;
+    BDRVRBDState *s = bs->opaque;
+    RBDTask task = { .bs = bs, .co = qemu_coroutine_self() };
     rbd_completion_t c;
     int r;
 
-    BDRVRBDState *s = bs->opaque;
+    assert(!qiov || qiov->size == bytes);
 
-    acb = qemu_aio_get(&rbd_aiocb_info, bs, cb, opaque);
-    acb->cmd = cmd;
-    acb->qiov = qiov;
-    assert(!qiov || qiov->size == size);
-
-    rcb = g_new(RADOSCB, 1);
-
-    acb->ret = 0;
-    acb->error = 0;
-    acb->s = s;
-
-    rcb->acb = acb;
-    rcb->s = acb->s;
-    rcb->size = size;
-    r = rbd_aio_create_completion(rcb, (rbd_callback_t) rbd_finish_aiocb, &c);
+    r = rbd_aio_create_completion(&task,
+                                  (rbd_callback_t) qemu_rbd_completion_cb, &c);
     if (r < 0) {
-        goto failed;
+        return r;
     }
 
     switch (cmd) {
-    case RBD_AIO_WRITE:
-        /*
-         * RBD APIs don't allow us to write more than actual size, so in order
-         * to support growing images, we resize the image before write
-         * operations that exceed the current size.
-         */
-        if (off + size > s->image_size) {
-            r = qemu_rbd_resize(bs, off + size);
-            if (r < 0) {
-                goto failed_completion;
-            }
-        }
-        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, off, c);
-        break;
     case RBD_AIO_READ:
-        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, off, c);
+        r = rbd_aio_readv(s->image, qiov->iov, qiov->niov, offset, c);
+        break;
+    case RBD_AIO_WRITE:
+        r = rbd_aio_writev(s->image, qiov->iov, qiov->niov, offset, c);
         break;
     case RBD_AIO_DISCARD:
-        r = rbd_aio_discard(s->image, off, size, c);
+        r = rbd_aio_discard(s->image, offset, bytes, c);
         break;
     case RBD_AIO_FLUSH:
         r = rbd_aio_flush(s->image, c);
@@ -918,44 +823,69 @@  static BlockAIOCB *rbd_start_aio(BlockDriverState *bs,
     }
 
     if (r < 0) {
-        goto failed_completion;
+        error_report("rbd request failed early: cmd %d offset %" PRIu64
+                     " bytes %" PRIu64 " flags %d r %d (%s)", cmd, offset,
+                     bytes, flags, r, strerror(-r));
+        rbd_aio_release(c);
+        return r;
     }
-    return &acb->common;
 
-failed_completion:
-    rbd_aio_release(c);
-failed:
-    g_free(rcb);
+    while (!task.complete) {
+        qemu_coroutine_yield();
+    }
 
-    qemu_aio_unref(acb);
-    return NULL;
+    if (task.ret < 0) {
+        error_report("rbd request failed: cmd %d offset %" PRIu64 " bytes %"
+                     PRIu64 " flags %d task.ret %" PRIi64 " (%s)", cmd, offset,
+                     bytes, flags, task.ret, strerror(-task.ret));
+        return task.ret;
+    }
+
+    /* zero pad short reads */
+    if (cmd == RBD_AIO_READ && task.ret < qiov->size) {
+        qemu_iovec_memset(qiov, task.ret, 0, qiov->size - task.ret);
+    }
+
+    return 0;
+}
+
+static int
+coroutine_fn qemu_rbd_co_preadv(BlockDriverState *bs, uint64_t offset,
+                               uint64_t bytes, QEMUIOVector *qiov,
+                               int flags)
+{
+    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_READ);
 }
 
-static BlockAIOCB *qemu_rbd_aio_preadv(BlockDriverState *bs,
-                                       uint64_t offset, uint64_t bytes,
-                                       QEMUIOVector *qiov, int flags,
-                                       BlockCompletionFunc *cb,
-                                       void *opaque)
+static int
+coroutine_fn qemu_rbd_co_pwritev(BlockDriverState *bs, uint64_t offset,
+                                 uint64_t bytes, QEMUIOVector *qiov,
+                                 int flags)
 {
-    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
-                         RBD_AIO_READ);
+    BDRVRBDState *s = bs->opaque;
+    /*
+     * RBD APIs don't allow us to write more than actual size, so in order
+     * to support growing images, we resize the image before write
+     * operations that exceed the current size.
+     */
+    if (offset + bytes > s->image_size) {
+        int r = qemu_rbd_resize(bs, offset + bytes);
+        if (r < 0) {
+            return r;
+        }
+    }
+    return qemu_rbd_start_co(bs, offset, bytes, qiov, flags, RBD_AIO_WRITE);
 }
 
-static BlockAIOCB *qemu_rbd_aio_pwritev(BlockDriverState *bs,
-                                        uint64_t offset, uint64_t bytes,
-                                        QEMUIOVector *qiov, int flags,
-                                        BlockCompletionFunc *cb,
-                                        void *opaque)
+static int coroutine_fn qemu_rbd_co_flush(BlockDriverState *bs)
 {
-    return rbd_start_aio(bs, offset, qiov, bytes, cb, opaque,
-                         RBD_AIO_WRITE);
+    return qemu_rbd_start_co(bs, 0, 0, NULL, 0, RBD_AIO_FLUSH);
 }
 
-static BlockAIOCB *qemu_rbd_aio_flush(BlockDriverState *bs,
-                                      BlockCompletionFunc *cb,
-                                      void *opaque)
+static int coroutine_fn qemu_rbd_co_pdiscard(BlockDriverState *bs,
+                                             int64_t offset, int count)
 {
-    return rbd_start_aio(bs, 0, NULL, 0, cb, opaque, RBD_AIO_FLUSH);
+    return qemu_rbd_start_co(bs, offset, count, NULL, 0, RBD_AIO_DISCARD);
 }
 
 static int qemu_rbd_getinfo(BlockDriverState *bs, BlockDriverInfo *bdi)
@@ -1116,16 +1046,6 @@  static int qemu_rbd_snap_list(BlockDriverState *bs,
     return snap_count;
 }
 
-static BlockAIOCB *qemu_rbd_aio_pdiscard(BlockDriverState *bs,
-                                         int64_t offset,
-                                         int bytes,
-                                         BlockCompletionFunc *cb,
-                                         void *opaque)
-{
-    return rbd_start_aio(bs, offset, NULL, bytes, cb, opaque,
-                         RBD_AIO_DISCARD);
-}
-
 static void coroutine_fn qemu_rbd_co_invalidate_cache(BlockDriverState *bs,
                                                       Error **errp)
 {
@@ -1189,11 +1109,10 @@  static BlockDriver bdrv_rbd = {
     .bdrv_co_truncate       = qemu_rbd_co_truncate,
     .protocol_name          = "rbd",
 
-    .bdrv_aio_preadv        = qemu_rbd_aio_preadv,
-    .bdrv_aio_pwritev       = qemu_rbd_aio_pwritev,
-
-    .bdrv_aio_flush         = qemu_rbd_aio_flush,
-    .bdrv_aio_pdiscard      = qemu_rbd_aio_pdiscard,
+    .bdrv_co_preadv         = qemu_rbd_co_preadv,
+    .bdrv_co_pwritev        = qemu_rbd_co_pwritev,
+    .bdrv_co_flush_to_disk  = qemu_rbd_co_flush,
+    .bdrv_co_pdiscard       = qemu_rbd_co_pdiscard,
 
     .bdrv_snapshot_create   = qemu_rbd_snap_create,
     .bdrv_snapshot_delete   = qemu_rbd_snap_remove,