Message ID | 20180122220806.22154-6-mreitz@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On Mon, 01/22 23:07, Max Reitz wrote: > @@ -101,7 +105,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, > } > } > > -static void mirror_iteration_done(MirrorOp *op, int ret) > +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) > { > MirrorBlockJob *s = op->s; > struct iovec *iov; I think we want s/qemu_coroutine_enter/aio_co_wake/ in mirror_iteration_done(). As an AIO callback before, this didn't matter, but now we are in an terminating coroutine, so it is pointless to defer the termination, or even risky in that we are in a aio_context_acquire/release section, but have already decremented s->in_flight, which is fishy. > @@ -138,9 +142,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret) > } > } > > -static void mirror_write_complete(void *opaque, int ret) > +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) > { > - MirrorOp *op = opaque; > MirrorBlockJob *s = op->s; > > aio_context_acquire(blk_get_aio_context(s->common.blk)); > @@ -157,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret) > aio_context_release(blk_get_aio_context(s->common.blk)); > } > > -static void mirror_read_complete(void *opaque, int ret) > +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) > { > - MirrorOp *op = opaque; > MirrorBlockJob *s = op->s; > > aio_context_acquire(blk_get_aio_context(s->common.blk)); > @@ -174,8 +176,11 @@ static void mirror_read_complete(void *opaque, int ret) > > mirror_iteration_done(op, ret); > } else { > - blk_aio_pwritev(s->target, op->offset, &op->qiov, > - 0, mirror_write_complete, op); > + int ret; s/ret/ret2/ or drop the definition? because ret is already the paramter of the function. > + > + ret = blk_co_pwritev(s->target, op->offset, > + op->qiov.size, &op->qiov, 0); > + mirror_write_complete(op, ret); > } > aio_context_release(blk_get_aio_context(s->common.blk)); > } <snip> > +static void coroutine_fn mirror_co_discard(void *opaque) > +{ > + MirrorOp *op = opaque; > + int ret; > + > + op->s->in_flight++; > + op->s->bytes_in_flight += op->bytes; > + *op->bytes_handled = op->bytes; > + > + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes); > + mirror_write_complete(op, ret); > } > > static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset, > unsigned bytes, MirrorMethod mirror_method) Doesn't mirror_perform need coroutine_fn annotation too? > { Fam
On 2018-02-27 08:44, Fam Zheng wrote: > On Mon, 01/22 23:07, Max Reitz wrote: >> @@ -101,7 +105,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, >> } >> } >> >> -static void mirror_iteration_done(MirrorOp *op, int ret) >> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) >> { >> MirrorBlockJob *s = op->s; >> struct iovec *iov; > > I think we want s/qemu_coroutine_enter/aio_co_wake/ in mirror_iteration_done(). > As an AIO callback before, this didn't matter, but now we are in an terminating > coroutine, so it is pointless to defer the termination, or even risky in that we > are in a aio_context_acquire/release section, but have already decremented > s->in_flight, which is fishy. I guess I'll still do the replacement, regardless of whether the next patch overwrites it again... >> @@ -138,9 +142,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret) >> } >> } >> >> -static void mirror_write_complete(void *opaque, int ret) >> +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) >> { >> - MirrorOp *op = opaque; >> MirrorBlockJob *s = op->s; >> >> aio_context_acquire(blk_get_aio_context(s->common.blk)); >> @@ -157,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret) >> aio_context_release(blk_get_aio_context(s->common.blk)); >> } >> >> -static void mirror_read_complete(void *opaque, int ret) >> +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) >> { >> - MirrorOp *op = opaque; >> MirrorBlockJob *s = op->s; >> >> aio_context_acquire(blk_get_aio_context(s->common.blk)); >> @@ -174,8 +176,11 @@ static void mirror_read_complete(void *opaque, int ret) >> >> mirror_iteration_done(op, ret); >> } else { >> - blk_aio_pwritev(s->target, op->offset, &op->qiov, >> - 0, mirror_write_complete, op); >> + int ret; > > s/ret/ret2/ or drop the definition? > because ret is already the paramter of the function. Oh, right, yes, will do. >> + >> + ret = blk_co_pwritev(s->target, op->offset, >> + op->qiov.size, &op->qiov, 0); >> + mirror_write_complete(op, ret); >> } >> aio_context_release(blk_get_aio_context(s->common.blk)); >> } > > <snip> > >> +static void coroutine_fn mirror_co_discard(void *opaque) >> +{ >> + MirrorOp *op = opaque; >> + int ret; >> + >> + op->s->in_flight++; >> + op->s->bytes_in_flight += op->bytes; >> + *op->bytes_handled = op->bytes; >> + >> + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes); >> + mirror_write_complete(op, ret); >> } >> >> static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset, >> unsigned bytes, MirrorMethod mirror_method) > > Doesn't mirror_perform need coroutine_fn annotation too? I don't think it needs one. We could give it one, but as far as I've understood (which may be wrong), all functions that need to be run from a coroutine need the tag -- but functions that may be called from either coroutines or just normal code don't need it. (And I think this function should be fine either way, so I don't think it needs a tag.) Also, thanks for reviewing! :-) Max
On 2018-02-28 15:13, Max Reitz wrote: > On 2018-02-27 08:44, Fam Zheng wrote: >> On Mon, 01/22 23:07, Max Reitz wrote: >>> @@ -101,7 +105,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, >>> } >>> } >>> >>> -static void mirror_iteration_done(MirrorOp *op, int ret) >>> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) >>> { >>> MirrorBlockJob *s = op->s; >>> struct iovec *iov; >> >> I think we want s/qemu_coroutine_enter/aio_co_wake/ in mirror_iteration_done(). >> As an AIO callback before, this didn't matter, but now we are in an terminating >> coroutine, so it is pointless to defer the termination, or even risky in that we >> are in a aio_context_acquire/release section, but have already decremented >> s->in_flight, which is fishy. > > I guess I'll still do the replacement, regardless of whether the next > patch overwrites it again... Maybe I don't. Doing this breaks iotest 041 because the assert(data.done) in bdrv_co_yield_to_drain() fails. Not sure why that is, but under the circumstance I guess it's best to just pretend this never happened, continue to use qemu_coroutine_enter() and just replace it in the next patch. As for in_flight: What is the issue there? We mostly need that to know how many I/O requests are actually running, that is, how much buffer space is used, how many I/O is done concurrently, etc. (and later we need the in-flight information so that we don't access the target in overlapping areas concurrently). But it doesn't seem to be about how many coroutines there are. So as long as the s->in_flight decrement is done in the same critical section as the op is deleted, we should be good...? Max
On Wed, 02/28 18:07, Max Reitz wrote: > On 2018-02-28 15:13, Max Reitz wrote: > > On 2018-02-27 08:44, Fam Zheng wrote: > >> On Mon, 01/22 23:07, Max Reitz wrote: > >>> @@ -101,7 +105,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, > >>> } > >>> } > >>> > >>> -static void mirror_iteration_done(MirrorOp *op, int ret) > >>> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) > >>> { > >>> MirrorBlockJob *s = op->s; > >>> struct iovec *iov; > >> > >> I think we want s/qemu_coroutine_enter/aio_co_wake/ in mirror_iteration_done(). > >> As an AIO callback before, this didn't matter, but now we are in an terminating > >> coroutine, so it is pointless to defer the termination, or even risky in that we > >> are in a aio_context_acquire/release section, but have already decremented > >> s->in_flight, which is fishy. > > > > I guess I'll still do the replacement, regardless of whether the next > > patch overwrites it again... > > Maybe I don't. Doing this breaks iotest 041 because the > assert(data.done) in bdrv_co_yield_to_drain() fails. > > Not sure why that is, but under the circumstance I guess it's best to > just pretend this never happened, continue to use qemu_coroutine_enter() > and just replace it in the next patch. > > As for in_flight: What is the issue there? We mostly need that to know > how many I/O requests are actually running, that is, how much buffer > space is used, how many I/O is done concurrently, etc. (and later we > need the in-flight information so that we don't access the target in > overlapping areas concurrently). But it doesn't seem to be about how > many coroutines there are. > > So as long as the s->in_flight decrement is done in the same critical > section as the op is deleted, we should be good...? I don't have a specific problem in my mind but is just generally concerned about the "if (s->in_flight == 0)" checks around mirror_exit. Fam
diff --git a/block/mirror.c b/block/mirror.c index 4066788ee2..71a8e66850 100644 --- a/block/mirror.c +++ b/block/mirror.c @@ -80,6 +80,10 @@ typedef struct MirrorOp { QEMUIOVector qiov; int64_t offset; uint64_t bytes; + + /* The pointee is set by mirror_co_read(), mirror_co_zero(), and + * mirror_co_discard() before yielding for the first time */ + int64_t *bytes_handled; } MirrorOp; typedef enum MirrorMethod { @@ -101,7 +105,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read, } } -static void mirror_iteration_done(MirrorOp *op, int ret) +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret) { MirrorBlockJob *s = op->s; struct iovec *iov; @@ -138,9 +142,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret) } } -static void mirror_write_complete(void *opaque, int ret) +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret) { - MirrorOp *op = opaque; MirrorBlockJob *s = op->s; aio_context_acquire(blk_get_aio_context(s->common.blk)); @@ -157,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret) aio_context_release(blk_get_aio_context(s->common.blk)); } -static void mirror_read_complete(void *opaque, int ret) +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret) { - MirrorOp *op = opaque; MirrorBlockJob *s = op->s; aio_context_acquire(blk_get_aio_context(s->common.blk)); @@ -174,8 +176,11 @@ static void mirror_read_complete(void *opaque, int ret) mirror_iteration_done(op, ret); } else { - blk_aio_pwritev(s->target, op->offset, &op->qiov, - 0, mirror_write_complete, op); + int ret; + + ret = blk_co_pwritev(s->target, op->offset, + op->qiov.size, &op->qiov, 0); + mirror_write_complete(op, ret); } aio_context_release(blk_get_aio_context(s->common.blk)); } @@ -232,60 +237,57 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s) s->waiting_for_io = false; } -/* Submit async read while handling COW. - * Returns: The number of bytes copied after and including offset, - * excluding any bytes copied prior to offset due to alignment. - * This will be @bytes if no alignment is necessary, or - * (new_end - offset) if tail is rounded up or down due to - * alignment or buffer limit. +/* Perform a mirror copy operation. + * + * *op->bytes_handled is set to the number of bytes copied after and + * including offset, excluding any bytes copied prior to offset due + * to alignment. This will be op->bytes if no alignment is necessary, + * or (new_end - op->offset) if the tail is rounded up or down due to + * alignment or buffer limit. */ -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, - uint64_t bytes) +static void coroutine_fn mirror_co_read(void *opaque) { + MirrorOp *op = opaque; + MirrorBlockJob *s = op->s; BlockBackend *source = s->common.blk; int nb_chunks; uint64_t ret; - MirrorOp *op; uint64_t max_bytes; max_bytes = s->granularity * s->max_iov; /* We can only handle as much as buf_size at a time. */ - bytes = MIN(s->buf_size, MIN(max_bytes, bytes)); - assert(bytes); - assert(bytes < BDRV_REQUEST_MAX_BYTES); - ret = bytes; + op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes)); + assert(op->bytes); + assert(op->bytes < BDRV_REQUEST_MAX_BYTES); + *op->bytes_handled = op->bytes; if (s->cow_bitmap) { - ret += mirror_cow_align(s, &offset, &bytes); + *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes); } - assert(bytes <= s->buf_size); + /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */ + assert(*op->bytes_handled <= UINT_MAX); + assert(op->bytes <= s->buf_size); /* The offset is granularity-aligned because: * 1) Caller passes in aligned values; * 2) mirror_cow_align is used only when target cluster is larger. */ - assert(QEMU_IS_ALIGNED(offset, s->granularity)); + assert(QEMU_IS_ALIGNED(op->offset, s->granularity)); /* The range is sector-aligned, since bdrv_getlength() rounds up. */ - assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE)); - nb_chunks = DIV_ROUND_UP(bytes, s->granularity); + assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE)); + nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity); while (s->buf_free_count < nb_chunks) { - trace_mirror_yield_in_flight(s, offset, s->in_flight); + trace_mirror_yield_in_flight(s, op->offset, s->in_flight); mirror_wait_for_io(s); } - /* Allocate a MirrorOp that is used as an AIO callback. */ - op = g_new(MirrorOp, 1); - op->s = s; - op->offset = offset; - op->bytes = bytes; - /* Now make a QEMUIOVector taking enough granularity-sized chunks * from s->buf_free. */ qemu_iovec_init(&op->qiov, nb_chunks); while (nb_chunks-- > 0) { MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free); - size_t remaining = bytes - op->qiov.size; + size_t remaining = op->bytes - op->qiov.size; QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next); s->buf_free_count--; @@ -294,53 +296,81 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset, /* Copy the dirty cluster. */ s->in_flight++; - s->bytes_in_flight += bytes; - trace_mirror_one_iteration(s, offset, bytes); + s->bytes_in_flight += op->bytes; + trace_mirror_one_iteration(s, op->offset, op->bytes); - blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op); - return ret; + ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0); + mirror_read_complete(op, ret); } -static void mirror_do_zero_or_discard(MirrorBlockJob *s, - int64_t offset, - uint64_t bytes, - bool is_discard) +static void coroutine_fn mirror_co_zero(void *opaque) { - MirrorOp *op; + MirrorOp *op = opaque; + int ret; - /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed - * so the freeing in mirror_iteration_done is nop. */ - op = g_new0(MirrorOp, 1); - op->s = s; - op->offset = offset; - op->bytes = bytes; + op->s->in_flight++; + op->s->bytes_in_flight += op->bytes; + *op->bytes_handled = op->bytes; - s->in_flight++; - s->bytes_in_flight += bytes; - if (is_discard) { - blk_aio_pdiscard(s->target, offset, - op->bytes, mirror_write_complete, op); - } else { - blk_aio_pwrite_zeroes(s->target, offset, - op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0, - mirror_write_complete, op); - } + ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes, + op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0); + mirror_write_complete(op, ret); +} + +static void coroutine_fn mirror_co_discard(void *opaque) +{ + MirrorOp *op = opaque; + int ret; + + op->s->in_flight++; + op->s->bytes_in_flight += op->bytes; + *op->bytes_handled = op->bytes; + + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes); + mirror_write_complete(op, ret); } static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset, unsigned bytes, MirrorMethod mirror_method) { + MirrorOp *op; + Coroutine *co; + int64_t bytes_handled = -1; + + op = g_new(MirrorOp, 1); + *op = (MirrorOp){ + .s = s, + .offset = offset, + .bytes = bytes, + .bytes_handled = &bytes_handled, + }; + switch (mirror_method) { case MIRROR_METHOD_COPY: - return mirror_do_read(s, offset, bytes); + co = qemu_coroutine_create(mirror_co_read, op); + break; case MIRROR_METHOD_ZERO: + co = qemu_coroutine_create(mirror_co_zero, op); + break; case MIRROR_METHOD_DISCARD: - mirror_do_zero_or_discard(s, offset, bytes, - mirror_method == MIRROR_METHOD_DISCARD); - return bytes; + co = qemu_coroutine_create(mirror_co_discard, op); + break; default: abort(); } + + qemu_coroutine_enter(co); + /* At this point, ownership of op has been moved to the coroutine + * and the object may already be freed */ + + /* Assert that this value has been set */ + assert(bytes_handled >= 0); + + /* Same assertion as in mirror_co_read() (and for mirror_co_read() + * and mirror_co_discard(), bytes_handled == op->bytes, which + * is the @bytes parameter given to this function) */ + assert(bytes_handled <= UINT_MAX); + return bytes_handled; } static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
In order to talk to the source BDS (and maybe in the future to the target BDS as well) directly, we need to convert our existing AIO requests into coroutine I/O requests. Signed-off-by: Max Reitz <mreitz@redhat.com> --- block/mirror.c | 154 ++++++++++++++++++++++++++++++++++----------------------- 1 file changed, 92 insertions(+), 62 deletions(-)