Message ID | 20170913181910.29688-6-mreitz@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On Wed, 09/13 20:18, Max Reitz wrote: > In order to talk to the source BDS (and maybe in the future to the > target BDS as well) directly, we need to convert our existing AIO > requests into coroutine I/O requests. > > Signed-off-by: Max Reitz <mreitz@redhat.com> > --- > block/mirror.c | 134 +++++++++++++++++++++++++++++++++------------------------ > 1 file changed, 78 insertions(+), 56 deletions(-) > > diff --git a/block/mirror.c b/block/mirror.c > index 4664b0516f..2b3297aa61 100644 > --- a/block/mirror.c > +++ b/block/mirror.c > @@ -80,6 +80,9 @@ typedef struct MirrorOp { > QEMUIOVector qiov; > int64_t offset; > uint64_t bytes; > + > + /* Set by mirror_co_read() before yielding for the first time */ > + uint64_t bytes_copied; > } MirrorOp; > > typedef enum MirrorMethod { > @@ -101,7 +104,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, > } > } > > -static void mirror_iteration_done(MirrorOp *op, int ret) > +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) > { > MirrorBlockJob *s = op->s; > struct iovec *iov; > @@ -138,9 +141,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret) > } > } > > -static void mirror_write_complete(void *opaque, int ret) > +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) > { > - MirrorOp *op = opaque; > MirrorBlockJob *s = op->s; > > aio_context_acquire(blk_get_aio_context(s->common.blk)); > @@ -158,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret) > aio_context_release(blk_get_aio_context(s->common.blk)); > } > > -static void mirror_read_complete(void *opaque, int ret) > +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) > { > - MirrorOp *op = opaque; > MirrorBlockJob *s = op->s; > > aio_context_acquire(blk_get_aio_context(s->common.blk)); > @@ -176,8 +177,11 @@ static void mirror_read_complete(void *opaque, int ret) > > mirror_iteration_done(op, ret); > } else { > - blk_aio_pwritev(s->target, op->offset, &op->qiov, > - 0, mirror_write_complete, op); > + int ret; > + > + ret = blk_co_pwritev(s->target, op->offset, > + op->qiov.size, &op->qiov, 0); > + mirror_write_complete(op, ret); > } > aio_context_release(blk_get_aio_context(s->common.blk)); > } > @@ -242,53 +246,49 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s) > * (new_end - offset) if tail is rounded up or down due to > * alignment or buffer limit. > */ > -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, > - uint64_t bytes) > +static void coroutine_fn mirror_co_read(void *opaque) > { > + MirrorOp *op = opaque; > + MirrorBlockJob *s = op->s; > BlockBackend *source = s->common.blk; > int nb_chunks; > uint64_t ret; > - MirrorOp *op; > uint64_t max_bytes; > > max_bytes = s->granularity * s->max_iov; > > /* We can only handle as much as buf_size at a time. */ > - bytes = MIN(s->buf_size, MIN(max_bytes, bytes)); > - assert(bytes); > - assert(bytes < BDRV_REQUEST_MAX_BYTES); > - ret = bytes; > + op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes)); > + assert(op->bytes); > + assert(op->bytes < BDRV_REQUEST_MAX_BYTES); > + op->bytes_copied = op->bytes; > > if (s->cow_bitmap) { > - ret += mirror_cow_align(s, &offset, &bytes); > + op->bytes_copied += mirror_cow_align(s, &op->offset, &op->bytes); > } > - assert(bytes <= s->buf_size); > + /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */ > + assert(op->bytes_copied <= UINT_MAX); > + assert(op->bytes <= s->buf_size); > /* The offset is granularity-aligned because: > * 1) Caller passes in aligned values; > * 2) mirror_cow_align is used only when target cluster is larger. */ > - assert(QEMU_IS_ALIGNED(offset, s->granularity)); > + assert(QEMU_IS_ALIGNED(op->offset, s->granularity)); > /* The range is sector-aligned, since bdrv_getlength() rounds up. */ > - assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE)); > - nb_chunks = DIV_ROUND_UP(bytes, s->granularity); > + assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE)); > + nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity); > > while (s->buf_free_count < nb_chunks) { > - trace_mirror_yield_in_flight(s, offset, s->in_flight); > + trace_mirror_yield_in_flight(s, op->offset, s->in_flight); > mirror_wait_for_io(s); > } > > - /* Allocate a MirrorOp that is used as an AIO callback. */ > - op = g_new(MirrorOp, 1); > - op->s = s; > - op->offset = offset; > - op->bytes = bytes; > - > /* Now make a QEMUIOVector taking enough granularity-sized chunks > * from s->buf_free. > */ > qemu_iovec_init(&op->qiov, nb_chunks); > while (nb_chunks-- > 0) { > MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free); > - size_t remaining = bytes - op->qiov.size; > + size_t remaining = op->bytes - op->qiov.size; > > QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next); > s->buf_free_count--; > @@ -297,53 +297,75 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, > > /* Copy the dirty cluster. */ > s->in_flight++; > - s->bytes_in_flight += bytes; > - trace_mirror_one_iteration(s, offset, bytes); > + s->bytes_in_flight += op->bytes; > + trace_mirror_one_iteration(s, op->offset, op->bytes); > > - blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op); > - return ret; > + ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0); > + mirror_read_complete(op, ret); > } > > -static void mirror_do_zero_or_discard(MirrorBlockJob *s, > - int64_t offset, > - uint64_t bytes, > - bool is_discard) > +static void coroutine_fn mirror_co_zero(void *opaque) > { > - MirrorOp *op; > + MirrorOp *op = opaque; > + int ret; > > - /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed > - * so the freeing in mirror_iteration_done is nop. */ > - op = g_new0(MirrorOp, 1); > - op->s = s; > - op->offset = offset; > - op->bytes = bytes; > + op->s->in_flight++; > + op->s->bytes_in_flight += op->bytes; > > - s->in_flight++; > - s->bytes_in_flight += bytes; > - if (is_discard) { > - blk_aio_pdiscard(s->target, offset, > - op->bytes, mirror_write_complete, op); > - } else { > - blk_aio_pwrite_zeroes(s->target, offset, > - op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0, > - mirror_write_complete, op); > - } > + ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes, > + op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0); > + mirror_write_complete(op, ret); > +} > + > +static void coroutine_fn mirror_co_discard(void *opaque) > +{ > + MirrorOp *op = opaque; > + int ret; > + > + op->s->in_flight++; > + op->s->bytes_in_flight += op->bytes; > + > + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes); > + mirror_write_complete(op, ret); > } > > static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset, > unsigned bytes, MirrorMethod mirror_method) > { > + MirrorOp *op; > + Coroutine *co; > + unsigned ret = bytes; > + > + op = g_new(MirrorOp, 1); > + *op = (MirrorOp){ > + .s = s, > + .offset = offset, > + .bytes = bytes, > + }; > + > switch (mirror_method) { > case MIRROR_METHOD_COPY: > - return mirror_do_read(s, offset, bytes); > + co = qemu_coroutine_create(mirror_co_read, op); > + break; > case MIRROR_METHOD_ZERO: > + co = qemu_coroutine_create(mirror_co_zero, op); > + break; > case MIRROR_METHOD_DISCARD: > - mirror_do_zero_or_discard(s, offset, bytes, > - mirror_method == MIRROR_METHOD_DISCARD); > - return bytes; > + co = qemu_coroutine_create(mirror_co_discard, op); > + break; > default: > abort(); > } > + > + qemu_coroutine_enter(co); > + > + if (mirror_method == MIRROR_METHOD_COPY) { > + /* Same assertion as in mirror_co_read() */ > + assert(op->bytes_copied <= UINT_MAX); > + ret = op->bytes_copied; > + } This special casing is a bit ugly. Can you just make mirror_co_zero and mirror_co_discard set op->bytes_copied too? (and perhaps rename to op->bytes_handled) If so the comment in MirrorOp needs an update too. And is it better to initialize it to -1 before entering coroutine, then assert it is != -1 afterwards? > + > + return ret; > } > > static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s) > -- > 2.13.5 > Fam
On 2017-09-18 08:02, Fam Zheng wrote: > On Wed, 09/13 20:18, Max Reitz wrote: >> In order to talk to the source BDS (and maybe in the future to the >> target BDS as well) directly, we need to convert our existing AIO >> requests into coroutine I/O requests. >> >> Signed-off-by: Max Reitz <mreitz@redhat.com> >> --- >> block/mirror.c | 134 +++++++++++++++++++++++++++++++++------------------------ >> 1 file changed, 78 insertions(+), 56 deletions(-) >> >> diff --git a/block/mirror.c b/block/mirror.c >> index 4664b0516f..2b3297aa61 100644 >> --- a/block/mirror.c >> +++ b/block/mirror.c >> @@ -80,6 +80,9 @@ typedef struct MirrorOp { >> QEMUIOVector qiov; >> int64_t offset; >> uint64_t bytes; >> + >> + /* Set by mirror_co_read() before yielding for the first time */ >> + uint64_t bytes_copied; >> } MirrorOp; >> >> typedef enum MirrorMethod { >> @@ -101,7 +104,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, >> } >> } >> >> -static void mirror_iteration_done(MirrorOp *op, int ret) >> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) >> { >> MirrorBlockJob *s = op->s; >> struct iovec *iov; >> @@ -138,9 +141,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret) >> } >> } >> >> -static void mirror_write_complete(void *opaque, int ret) >> +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) >> { >> - MirrorOp *op = opaque; >> MirrorBlockJob *s = op->s; >> >> aio_context_acquire(blk_get_aio_context(s->common.blk)); >> @@ -158,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret) >> aio_context_release(blk_get_aio_context(s->common.blk)); >> } >> >> -static void mirror_read_complete(void *opaque, int ret) >> +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) >> { >> - MirrorOp *op = opaque; >> MirrorBlockJob *s = op->s; >> >> aio_context_acquire(blk_get_aio_context(s->common.blk)); >> @@ -176,8 +177,11 @@ static void mirror_read_complete(void *opaque, int ret) >> >> mirror_iteration_done(op, ret); >> } else { >> - blk_aio_pwritev(s->target, op->offset, &op->qiov, >> - 0, mirror_write_complete, op); >> + int ret; >> + >> + ret = blk_co_pwritev(s->target, op->offset, >> + op->qiov.size, &op->qiov, 0); >> + mirror_write_complete(op, ret); >> } >> aio_context_release(blk_get_aio_context(s->common.blk)); >> } >> @@ -242,53 +246,49 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s) >> * (new_end - offset) if tail is rounded up or down due to >> * alignment or buffer limit. >> */ >> -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, >> - uint64_t bytes) >> +static void coroutine_fn mirror_co_read(void *opaque) >> { >> + MirrorOp *op = opaque; >> + MirrorBlockJob *s = op->s; >> BlockBackend *source = s->common.blk; >> int nb_chunks; >> uint64_t ret; >> - MirrorOp *op; >> uint64_t max_bytes; >> >> max_bytes = s->granularity * s->max_iov; >> >> /* We can only handle as much as buf_size at a time. */ >> - bytes = MIN(s->buf_size, MIN(max_bytes, bytes)); >> - assert(bytes); >> - assert(bytes < BDRV_REQUEST_MAX_BYTES); >> - ret = bytes; >> + op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes)); >> + assert(op->bytes); >> + assert(op->bytes < BDRV_REQUEST_MAX_BYTES); >> + op->bytes_copied = op->bytes; >> >> if (s->cow_bitmap) { >> - ret += mirror_cow_align(s, &offset, &bytes); >> + op->bytes_copied += mirror_cow_align(s, &op->offset, &op->bytes); >> } >> - assert(bytes <= s->buf_size); >> + /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */ >> + assert(op->bytes_copied <= UINT_MAX); >> + assert(op->bytes <= s->buf_size); >> /* The offset is granularity-aligned because: >> * 1) Caller passes in aligned values; >> * 2) mirror_cow_align is used only when target cluster is larger. */ >> - assert(QEMU_IS_ALIGNED(offset, s->granularity)); >> + assert(QEMU_IS_ALIGNED(op->offset, s->granularity)); >> /* The range is sector-aligned, since bdrv_getlength() rounds up. */ >> - assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE)); >> - nb_chunks = DIV_ROUND_UP(bytes, s->granularity); >> + assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE)); >> + nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity); >> >> while (s->buf_free_count < nb_chunks) { >> - trace_mirror_yield_in_flight(s, offset, s->in_flight); >> + trace_mirror_yield_in_flight(s, op->offset, s->in_flight); >> mirror_wait_for_io(s); >> } >> >> - /* Allocate a MirrorOp that is used as an AIO callback. */ >> - op = g_new(MirrorOp, 1); >> - op->s = s; >> - op->offset = offset; >> - op->bytes = bytes; >> - >> /* Now make a QEMUIOVector taking enough granularity-sized chunks >> * from s->buf_free. >> */ >> qemu_iovec_init(&op->qiov, nb_chunks); >> while (nb_chunks-- > 0) { >> MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free); >> - size_t remaining = bytes - op->qiov.size; >> + size_t remaining = op->bytes - op->qiov.size; >> >> QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next); >> s->buf_free_count--; >> @@ -297,53 +297,75 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, >> >> /* Copy the dirty cluster. */ >> s->in_flight++; >> - s->bytes_in_flight += bytes; >> - trace_mirror_one_iteration(s, offset, bytes); >> + s->bytes_in_flight += op->bytes; >> + trace_mirror_one_iteration(s, op->offset, op->bytes); >> >> - blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op); >> - return ret; >> + ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0); >> + mirror_read_complete(op, ret); >> } >> >> -static void mirror_do_zero_or_discard(MirrorBlockJob *s, >> - int64_t offset, >> - uint64_t bytes, >> - bool is_discard) >> +static void coroutine_fn mirror_co_zero(void *opaque) >> { >> - MirrorOp *op; >> + MirrorOp *op = opaque; >> + int ret; >> >> - /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed >> - * so the freeing in mirror_iteration_done is nop. */ >> - op = g_new0(MirrorOp, 1); >> - op->s = s; >> - op->offset = offset; >> - op->bytes = bytes; >> + op->s->in_flight++; >> + op->s->bytes_in_flight += op->bytes; >> >> - s->in_flight++; >> - s->bytes_in_flight += bytes; >> - if (is_discard) { >> - blk_aio_pdiscard(s->target, offset, >> - op->bytes, mirror_write_complete, op); >> - } else { >> - blk_aio_pwrite_zeroes(s->target, offset, >> - op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0, >> - mirror_write_complete, op); >> - } >> + ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes, >> + op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0); >> + mirror_write_complete(op, ret); >> +} >> + >> +static void coroutine_fn mirror_co_discard(void *opaque) >> +{ >> + MirrorOp *op = opaque; >> + int ret; >> + >> + op->s->in_flight++; >> + op->s->bytes_in_flight += op->bytes; >> + >> + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes); >> + mirror_write_complete(op, ret); >> } >> >> static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset, >> unsigned bytes, MirrorMethod mirror_method) >> { >> + MirrorOp *op; >> + Coroutine *co; >> + unsigned ret = bytes; >> + >> + op = g_new(MirrorOp, 1); >> + *op = (MirrorOp){ >> + .s = s, >> + .offset = offset, >> + .bytes = bytes, >> + }; >> + >> switch (mirror_method) { >> case MIRROR_METHOD_COPY: >> - return mirror_do_read(s, offset, bytes); >> + co = qemu_coroutine_create(mirror_co_read, op); >> + break; >> case MIRROR_METHOD_ZERO: >> + co = qemu_coroutine_create(mirror_co_zero, op); >> + break; >> case MIRROR_METHOD_DISCARD: >> - mirror_do_zero_or_discard(s, offset, bytes, >> - mirror_method == MIRROR_METHOD_DISCARD); >> - return bytes; >> + co = qemu_coroutine_create(mirror_co_discard, op); >> + break; >> default: >> abort(); >> } >> + >> + qemu_coroutine_enter(co); >> + >> + if (mirror_method == MIRROR_METHOD_COPY) { >> + /* Same assertion as in mirror_co_read() */ >> + assert(op->bytes_copied <= UINT_MAX); >> + ret = op->bytes_copied; >> + } > > This special casing is a bit ugly. Can you just make mirror_co_zero and > mirror_co_discard set op->bytes_copied too? (and perhaps rename to > op->bytes_handled) If so the comment in MirrorOp needs an update too. Sure. > And is it better to initialize it to -1 before entering coroutine, then assert > it is != -1 afterwards? Sounds good, will do. Max
Am 13.09.2017 um 20:18 hat Max Reitz geschrieben: > In order to talk to the source BDS (and maybe in the future to the > target BDS as well) directly, we need to convert our existing AIO > requests into coroutine I/O requests. > > Signed-off-by: Max Reitz <mreitz@redhat.com> Please follow through with it and add a few patches that turn it into natural coroutine code rather than just any coroutine code. I know I did the same kind of half-assed conversion in qed, but mirror is code that is actually used and that people look at for more than just a bad example. You'll probably notice more things when you do this, but the obvious things would be changing mirror_co_read() into a mirror_co_copy() with the former callbacks inlined; keeping op on the stack instead of mallocing it in mirror_perform() and free it deep inside the nested functions that used to be callbacks; and probably also cleaning up the random calls to aio_context_acquire/release() that will now appear in the middle of the function. Anyway, that's for follow-up patches (though ideally in the same series), so for this one you can have: Reviewed-by: Kevin Wolf <kwolf@redhat.com>
On 2017-10-10 11:14, Kevin Wolf wrote: > Am 13.09.2017 um 20:18 hat Max Reitz geschrieben: >> In order to talk to the source BDS (and maybe in the future to the >> target BDS as well) directly, we need to convert our existing AIO >> requests into coroutine I/O requests. >> >> Signed-off-by: Max Reitz <mreitz@redhat.com> > > Please follow through with it and add a few patches that turn it into > natural coroutine code rather than just any coroutine code. I know I did > the same kind of half-assed conversion in qed, but mirror is code that > is actually used and that people look at for more than just a bad > example. > > You'll probably notice more things when you do this, but the obvious > things would be changing mirror_co_read() into a mirror_co_copy() with > the former callbacks inlined; keeping op on the stack instead of > mallocing it in mirror_perform() and free it deep inside the nested > functions that used to be callbacks; and probably also cleaning up the > random calls to aio_context_acquire/release() that will now appear in > the middle of the function. > > Anyway, that's for follow-up patches (though ideally in the same > series), so for this one you can have: > > Reviewed-by: Kevin Wolf <kwolf@redhat.com> Phew. :-) I think I'll write the patches (while working on v2), but I'll send them as a follow-up. Max
diff --git a/block/mirror.c b/block/mirror.c index 4664b0516f..2b3297aa61 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -80,6 +80,9 @@ typedef struct MirrorOp { QEMUIOVector qiov; int64_t offset; uint64_t bytes; + + /* Set by mirror_co_read() before yielding for the first time */ + uint64_t bytes_copied; } MirrorOp; typedef enum MirrorMethod { @@ -101,7 +104,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, } } -static void mirror_iteration_done(MirrorOp *op, int ret) +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) { MirrorBlockJob *s = op->s; struct iovec *iov; @@ -138,9 +141,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret) } } -static void mirror_write_complete(void *opaque, int ret) +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) { - MirrorOp *op = opaque; MirrorBlockJob *s = op->s; aio_context_acquire(blk_get_aio_context(s->common.blk)); @@ -158,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret) aio_context_release(blk_get_aio_context(s->common.blk)); } -static void mirror_read_complete(void *opaque, int ret) +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) { - MirrorOp *op = opaque; MirrorBlockJob *s = op->s; aio_context_acquire(blk_get_aio_context(s->common.blk)); @@ -176,8 +177,11 @@ static void mirror_read_complete(void *opaque, int ret) mirror_iteration_done(op, ret); } else { - blk_aio_pwritev(s->target, op->offset, &op->qiov, - 0, mirror_write_complete, op); + int ret; + + ret = blk_co_pwritev(s->target, op->offset, + op->qiov.size, &op->qiov, 0); + mirror_write_complete(op, ret); } aio_context_release(blk_get_aio_context(s->common.blk)); } @@ -242,53 +246,49 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s) * (new_end - offset) if tail is rounded up or down due to * alignment or buffer limit. */ -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, - uint64_t bytes) +static void coroutine_fn mirror_co_read(void *opaque) { + MirrorOp *op = opaque; + MirrorBlockJob *s = op->s; BlockBackend *source = s->common.blk; int nb_chunks; uint64_t ret; - MirrorOp *op; uint64_t max_bytes; max_bytes = s->granularity * s->max_iov; /* We can only handle as much as buf_size at a time. */ - bytes = MIN(s->buf_size, MIN(max_bytes, bytes)); - assert(bytes); - assert(bytes < BDRV_REQUEST_MAX_BYTES); - ret = bytes; + op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes)); + assert(op->bytes); + assert(op->bytes < BDRV_REQUEST_MAX_BYTES); + op->bytes_copied = op->bytes; if (s->cow_bitmap) { - ret += mirror_cow_align(s, &offset, &bytes); + op->bytes_copied += mirror_cow_align(s, &op->offset, &op->bytes); } - assert(bytes <= s->buf_size); + /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */ + assert(op->bytes_copied <= UINT_MAX); + assert(op->bytes <= s->buf_size); /* The offset is granularity-aligned because: * 1) Caller passes in aligned values; * 2) mirror_cow_align is used only when target cluster is larger. */ - assert(QEMU_IS_ALIGNED(offset, s->granularity)); + assert(QEMU_IS_ALIGNED(op->offset, s->granularity)); /* The range is sector-aligned, since bdrv_getlength() rounds up. */ - assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE)); - nb_chunks = DIV_ROUND_UP(bytes, s->granularity); + assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE)); + nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity); while (s->buf_free_count < nb_chunks) { - trace_mirror_yield_in_flight(s, offset, s->in_flight); + trace_mirror_yield_in_flight(s, op->offset, s->in_flight); mirror_wait_for_io(s); } - /* Allocate a MirrorOp that is used as an AIO callback. */ - op = g_new(MirrorOp, 1); - op->s = s; - op->offset = offset; - op->bytes = bytes; - /* Now make a QEMUIOVector taking enough granularity-sized chunks * from s->buf_free. */ qemu_iovec_init(&op->qiov, nb_chunks); while (nb_chunks-- > 0) { MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free); - size_t remaining = bytes - op->qiov.size; + size_t remaining = op->bytes - op->qiov.size; QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next); s->buf_free_count--; @@ -297,53 +297,75 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, /* Copy the dirty cluster. */ s->in_flight++; - s->bytes_in_flight += bytes; - trace_mirror_one_iteration(s, offset, bytes); + s->bytes_in_flight += op->bytes; + trace_mirror_one_iteration(s, op->offset, op->bytes); - blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op); - return ret; + ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0); + mirror_read_complete(op, ret); } -static void mirror_do_zero_or_discard(MirrorBlockJob *s, - int64_t offset, - uint64_t bytes, - bool is_discard) +static void coroutine_fn mirror_co_zero(void *opaque) { - MirrorOp *op; + MirrorOp *op = opaque; + int ret; - /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed - * so the freeing in mirror_iteration_done is nop. */ - op = g_new0(MirrorOp, 1); - op->s = s; - op->offset = offset; - op->bytes = bytes; + op->s->in_flight++; + op->s->bytes_in_flight += op->bytes; - s->in_flight++; - s->bytes_in_flight += bytes; - if (is_discard) { - blk_aio_pdiscard(s->target, offset, - op->bytes, mirror_write_complete, op); - } else { - blk_aio_pwrite_zeroes(s->target, offset, - op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0, - mirror_write_complete, op); - } + ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes, + op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0); + mirror_write_complete(op, ret); +} + +static void coroutine_fn mirror_co_discard(void *opaque) +{ + MirrorOp *op = opaque; + int ret; + + op->s->in_flight++; + op->s->bytes_in_flight += op->bytes; + + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes); + mirror_write_complete(op, ret); } static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset, unsigned bytes, MirrorMethod mirror_method) { + MirrorOp *op; + Coroutine *co; + unsigned ret = bytes; + + op = g_new(MirrorOp, 1); + *op = (MirrorOp){ + .s = s, + .offset = offset, + .bytes = bytes, + }; + switch (mirror_method) { case MIRROR_METHOD_COPY: - return mirror_do_read(s, offset, bytes); + co = qemu_coroutine_create(mirror_co_read, op); + break; case MIRROR_METHOD_ZERO: + co = qemu_coroutine_create(mirror_co_zero, op); + break; case MIRROR_METHOD_DISCARD: - mirror_do_zero_or_discard(s, offset, bytes, - mirror_method == MIRROR_METHOD_DISCARD); - return bytes; + co = qemu_coroutine_create(mirror_co_discard, op); + break; default: abort(); } + + qemu_coroutine_enter(co); + + if (mirror_method == MIRROR_METHOD_COPY) { + /* Same assertion as in mirror_co_read() */ + assert(op->bytes_copied <= UINT_MAX); + ret = op->bytes_copied; + } + + return ret; } static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
In order to talk to the source BDS (and maybe in the future to the target BDS as well) directly, we need to convert our existing AIO requests into coroutine I/O requests. Signed-off-by: Max Reitz <mreitz@redhat.com> --- block/mirror.c | 134 +++++++++++++++++++++++++++++++++------------------------ 1 file changed, 78 insertions(+), 56 deletions(-)