diff mbox series

[v11,13/13] block: apply COR-filter to block-stream jobs

Message ID 1602524605-481160-14-git-send-email-andrey.shinkevich@virtuozzo.com (mailing list archive)
State New, archived
Headers show
Series Apply COR-filter to the block-stream permanently | expand

Commit Message

Denis V. Lunev" via Oct. 12, 2020, 5:43 p.m. UTC
This patch completes the series with the COR-filter insertion for
block-stream operations. Adding the filter makes it possible for copied
regions to be discarded in backing files during the block-stream job,
what will reduce the disk overuse.
The COR-filter insertion incurs changes in the iotests case
245:test_block_stream_4 that reopens the backing chain during a
block-stream job. There are changes in the iotests #030 as well.
The iotests case 030:test_stream_parallel was deleted due to multiple
conflicts between the concurrent job operations over the same backing
chain. The base backing node for one job is the top node for another
job. It may change due to the filter node inserted into the backing
chain while both jobs are running. Another issue is that the parts of
the backing chain are being frozen by the running job and may not be
changed by the concurrent job when needed. The concept of the parallel
jobs with common nodes is considered vital no more.

Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
---
 block/stream.c             | 93 +++++++++++++++++++++++++++++-----------------
 tests/qemu-iotests/030     | 51 +++----------------------
 tests/qemu-iotests/030.out |  4 +-
 tests/qemu-iotests/141.out |  2 +-
 tests/qemu-iotests/245     | 19 +++++++---
 5 files changed, 81 insertions(+), 88 deletions(-)

Comments

Max Reitz Oct. 14, 2020, 4:24 p.m. UTC | #1
On 12.10.20 19:43, Andrey Shinkevich wrote:
> This patch completes the series with the COR-filter insertion for
> block-stream operations. Adding the filter makes it possible for copied
> regions to be discarded in backing files during the block-stream job,
> what will reduce the disk overuse.
> The COR-filter insertion incurs changes in the iotests case
> 245:test_block_stream_4 that reopens the backing chain during a
> block-stream job. There are changes in the iotests #030 as well.
> The iotests case 030:test_stream_parallel was deleted due to multiple
> conflicts between the concurrent job operations over the same backing
> chain. The base backing node for one job is the top node for another
> job. It may change due to the filter node inserted into the backing
> chain while both jobs are running. Another issue is that the parts of
> the backing chain are being frozen by the running job and may not be
> changed by the concurrent job when needed. The concept of the parallel
> jobs with common nodes is considered vital no more.
> 
> Signed-off-by: Andrey Shinkevich <andrey.shinkevich@virtuozzo.com>
> ---
>  block/stream.c             | 93 +++++++++++++++++++++++++++++-----------------
>  tests/qemu-iotests/030     | 51 +++----------------------
>  tests/qemu-iotests/030.out |  4 +-
>  tests/qemu-iotests/141.out |  2 +-
>  tests/qemu-iotests/245     | 19 +++++++---
>  5 files changed, 81 insertions(+), 88 deletions(-)

Looks like stream_run() could be a bit streamlined now (the allocation
checking should be unnecessary, unconditionally calling
stream_populate() should be sufficient), but not necessary now.

> diff --git a/block/stream.c b/block/stream.c
> index d3e1812..93564db 100644
> --- a/block/stream.c
> +++ b/block/stream.c

[...]

> @@ -94,13 +94,14 @@ static void stream_clean(Job *job)
>  {
>      StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
>      BlockJob *bjob = &s->common;
> -    BlockDriverState *bs = blk_bs(bjob->blk);
> +
> +    bdrv_cor_filter_drop(s->cor_filter_bs);
>  
>      /* Reopen the image back in read-only mode if necessary */
>      if (s->bs_read_only) {
>          /* Give up write permissions before making it read-only */
>          blk_set_perm(bjob->blk, 0, BLK_PERM_ALL, &error_abort);

Perhaps it would be good to do so even before the filter is dropped.  I
don’t know whether moving bjob->blk from cor_filter_bs to target_bs
might cause problems otherwise.

> -        bdrv_reopen_set_read_only(bs, true, NULL);
> +        bdrv_reopen_set_read_only(s->target_bs, true, NULL);
>      }
>  }

[...]

> @@ -262,17 +249,48 @@ void stream_start(const char *job_id, BlockDriverState *bs,
>          }
>      }
>  
> -    /* Prevent concurrent jobs trying to modify the graph structure here, we
> -     * already have our own plans. Also don't allow resize as the image size is
> -     * queried only at the job start and then cached. */
> -    s = block_job_create(job_id, &stream_job_driver, NULL, bs,
> -                         basic_flags | BLK_PERM_GRAPH_MOD,
> -                         basic_flags | BLK_PERM_WRITE,
> +    QDict *opts = qdict_new();

Declaration should be done at the start of the block.

> +
> +    qdict_put_str(opts, "driver", "copy-on-read");
> +    qdict_put_str(opts, "file", bdrv_get_node_name(bs));
> +    if (base_overlay) {

@base_overlay is always non-NULL, this condition should check @base, I
think.

> +        /* Pass the base_overlay rather than base */
> +        qdict_put_str(opts, "base", base_overlay->node_name);
> +    }
> +    if (filter_node_name) {
> +        qdict_put_str(opts, "node-name", filter_node_name);
> +    }
> +
> +    cor_filter_bs = bdrv_cor_filter_append(bs, opts, BDRV_O_RDWR, errp);
> +    if (cor_filter_bs == NULL) {
> +        goto fail;
> +    }
> +
> +    if (bdrv_freeze_backing_chain(cor_filter_bs, bs, errp) < 0) {

Is there a reason why we can’t combine this with the
bdrv_free_backing_chain() from bs down to above_base?  I mean, the
effect should be the same, just asking.

> +        bdrv_cor_filter_drop(cor_filter_bs);
> +        cor_filter_bs = NULL;
> +        goto fail;
> +    }
> +
> +    s = block_job_create(job_id, &stream_job_driver, NULL, cor_filter_bs,
> +                         BLK_PERM_CONSISTENT_READ,
> +                         basic_flags | BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD,

Not that I’m an expert on the GRAPH_MOD permission, but why is this
shared here but not below?  Shouldn’t it be the same in both cases?
(Same for taking it as a permission.)

>                           speed, creation_flags, NULL, NULL, errp);
>      if (!s) {
>          goto fail;
>      }
>  
> +    /*
> +     * Prevent concurrent jobs trying to modify the graph structure here, we
> +     * already have our own plans. Also don't allow resize as the image size is
> +     * queried only at the job start and then cached.
> +     */
> +    if (block_job_add_bdrv(&s->common, "active node", bs,
> +                           basic_flags | BLK_PERM_GRAPH_MOD,
> +                           basic_flags | BLK_PERM_WRITE, &error_abort)) {
> +        goto fail;
> +    }
> +
>      /* Block all intermediate nodes between bs and base, because they will
>       * disappear from the chain after this operation. The streaming job reads
>       * every block only once, assuming that it doesn't change, so forbid writes

[...]

> diff --git a/tests/qemu-iotests/245 b/tests/qemu-iotests/245
> index e60c832..940e85a 100755
> --- a/tests/qemu-iotests/245
> +++ b/tests/qemu-iotests/245
> @@ -899,17 +899,26 @@ class TestBlockdevReopen(iotests.QMPTestCase):
>          # make hd1 read-only and block-stream requires it to be read-write
>          # (Which error message appears depends on whether the stream job is
>          # already done with copying at this point.)

Hm.  Let’s look at the set of messages below... [1]

> -        self.reopen(opts, {},
> +        # As the COR-filter node is inserted into the backing chain with the
> +        # 'block-stream' operation, we move the options to their proper nodes.
> +        opts = hd_opts(1)

Oh, so this patch changes it so that only the subtree below hd1 is
reopened, and we don’t have to deal with the filter options.  Got it.
(I think.)

> +        opts['backing'] = hd_opts(2)
> +        opts['backing']['backing'] = None
> +        self.reopen(opts, {'read-only': True},
>              ["Can't set node 'hd1' to r/o with copy-on-read enabled",

[1]

This isn’t done anymore as of this patch.  So I don’t think this error
message can still appear.  Will some other message appear in its stead,
or is it always going to be the second one?

>               "Cannot make block node read-only, there is a writer on it"])
>  
>          # We can't remove hd2 while the stream job is ongoing
> -        opts['backing']['backing'] = None
> -        self.reopen(opts, {'backing.read-only': False}, "Cannot change 'backing' link from 'hd1' to 'hd2'")
> +        opts['backing'] = None
> +        self.reopen(opts, {'read-only': False},
> +                    "Cannot change 'backing' link from 'hd1' to 'hd2'")
>  
> -        # We can detach hd1 from hd0 because it doesn't affect the stream job
> +        # We can't detach hd1 from hd0 because there is the COR-filter implicit
> +        # node in between.
> +        opts = hd_opts(0)
>          opts['backing'] = None
> -        self.reopen(opts)
> +        self.reopen(opts, {},
> +                    "Cannot change backing link if 'hd0' has an implicit backing file")

Does “has an implicit backing file” mean that hd0 has an implicit node
(the COR filter) as its backing file?  And then reopening isn’t allowed
because the user supposedly doesn’t know about that implicit node?  If
so, makes sense.

Max

>  
>          self.vm.run_job('stream0', auto_finalize = False, auto_dismiss = True)
>  
>
Andrey Shinkevich Oct. 15, 2020, 5:16 p.m. UTC | #2
On 14.10.2020 19:24, Max Reitz wrote:
> On 12.10.20 19:43, Andrey Shinkevich wrote:

[...]

>> ---
>>   block/stream.c             | 93 +++++++++++++++++++++++++++++-----------------
>>   tests/qemu-iotests/030     | 51 +++----------------------
>>   tests/qemu-iotests/030.out |  4 +-
>>   tests/qemu-iotests/141.out |  2 +-
>>   tests/qemu-iotests/245     | 19 +++++++---
>>   5 files changed, 81 insertions(+), 88 deletions(-)
> 
> Looks like stream_run() could be a bit streamlined now (the allocation
> checking should be unnecessary, unconditionally calling
> stream_populate() should be sufficient), but not necessary now.
> 

That is what I had kept in my mind when I tackled this patch. But there 
is an underwater reef to streamline. Namely, how the block-stream job 
gets known about a long unallocated tail to exit the loop earlier in the 
stream_run(). Shall we return the '-EOF' or another error code from the 
cor_co_preadv_part() to be handled by the stream_run()? Any other 
suggestions, if any, will be appreciated.

>> diff --git a/block/stream.c b/block/stream.c
>> index d3e1812..93564db 100644
>> --- a/block/stream.c
>> +++ b/block/stream.c
> 
> [...]

>> +
>> +    cor_filter_bs = bdrv_cor_filter_append(bs, opts, BDRV_O_RDWR, errp);
>> +    if (cor_filter_bs == NULL) {
>> +        goto fail;
>> +    }
>> +
>> +    if (bdrv_freeze_backing_chain(cor_filter_bs, bs, errp) < 0) {
> 
> Is there a reason why we can’t combine this with the
> bdrv_free_backing_chain() from bs down to above_base?  I mean, the
> effect should be the same, just asking.
> 

The bdrv_freeze_backing_chain(bs, above_base, errp) is called before the 
bdrv_reopen_set_read_only() to keep the backing chain safe during the 
context switch. Then we will want to freeze the 'COR -> TOP BS' link as 
well. Freezing/unfreezing parts is simlier to manage than doing that 
with the whole chain.
If we decide to invoke the bdrv_reopen_set_read_only() after freezing 
the backing chain together with the COR-filter, we will not be able to 
get the 'write' permission on the read-only node.


>> +        bdrv_cor_filter_drop(cor_filter_bs);
>> +        cor_filter_bs = NULL;
>> +        goto fail;
>> +    }
>> +
>> +    s = block_job_create(job_id, &stream_job_driver, NULL, cor_filter_bs,
>> +                         BLK_PERM_CONSISTENT_READ,
>> +                         basic_flags | BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD,
> 
> Not that I’m an expert on the GRAPH_MOD permission, but why is this
> shared here but not below?  Shouldn’t it be the same in both cases?
> (Same for taking it as a permission.)
> 

When we invoke the block_job_add_bdrv(&s->common, "active node", bs,..) 
below (particularly, we need it to block the operations on the top node, 
bdrv_op_block_all()), we ask for the GRAPH_MOD permission for the top 
node. To allow that, the parent filter node should share that permission 
for the underlying node. Otherwise, we get assertion failed in the 
bdrv_check_update_perm() called from bdrv_replace_node() when we remove 
the filter.

>>                            speed, creation_flags, NULL, NULL, errp);
>>       if (!s) {
>>           goto fail;
>>       }
>>   
>> +    /*
>> +     * Prevent concurrent jobs trying to modify the graph structure here, we
>> +     * already have our own plans. Also don't allow resize as the image size is
>> +     * queried only at the job start and then cached.
>> +     */
>> +    if (block_job_add_bdrv(&s->common, "active node", bs,
>> +                           basic_flags | BLK_PERM_GRAPH_MOD,
>> +                           basic_flags | BLK_PERM_WRITE, &error_abort)) {
>> +        goto fail;
>> +    }
>> +
>>       /* Block all intermediate nodes between bs and base, because they will
>>        * disappear from the chain after this operation. The streaming job reads
>>        * every block only once, assuming that it doesn't change, so forbid writes
> 
> [...]
> 
>> diff --git a/tests/qemu-iotests/245 b/tests/qemu-iotests/245
>> index e60c832..940e85a 100755
>> --- a/tests/qemu-iotests/245
>> +++ b/tests/qemu-iotests/245
>> @@ -899,17 +899,26 @@ class TestBlockdevReopen(iotests.QMPTestCase):
>>           # make hd1 read-only and block-stream requires it to be read-write
>>           # (Which error message appears depends on whether the stream job is
>>           # already done with copying at this point.)
> 
> Hm.  Let’s look at the set of messages below... [1]
> 
>> -        self.reopen(opts, {},
>> +        # As the COR-filter node is inserted into the backing chain with the
>> +        # 'block-stream' operation, we move the options to their proper nodes.
>> +        opts = hd_opts(1)
> 
> Oh, so this patch changes it so that only the subtree below hd1 is
> reopened, and we don’t have to deal with the filter options.  Got it.
> (I think.)
> 

Yes, that's right.

>> +        opts['backing'] = hd_opts(2)
>> +        opts['backing']['backing'] = None
>> +        self.reopen(opts, {'read-only': True},
>>               ["Can't set node 'hd1' to r/o with copy-on-read enabled",
> 
> [1]
> 
> This isn’t done anymore as of this patch.  So I don’t think this error
> message can still appear.  Will some other message appear in its stead,
> or is it always going to be the second one?
> 

The only second message appears in the test case when I run it on my 
node. So, I will remove the first one as the bdrv_enable_copy_on_read() 
is not called for the top BS on the frozen backing chain anymore.
Also, I will delet the part of the comment:
"(Which error message appears depends on whether the stream job is 
already done with copying at this point.)"

>>                "Cannot make block node read-only, there is a writer on it"])
>>   
>>           # We can't remove hd2 while the stream job is ongoing
>> -        opts['backing']['backing'] = None
>> -        self.reopen(opts, {'backing.read-only': False}, "Cannot change 'backing' link from 'hd1' to 'hd2'")
>> +        opts['backing'] = None
>> +        self.reopen(opts, {'read-only': False},
>> +                    "Cannot change 'backing' link from 'hd1' to 'hd2'")
>>   
>> -        # We can detach hd1 from hd0 because it doesn't affect the stream job
>> +        # We can't detach hd1 from hd0 because there is the COR-filter implicit
>> +        # node in between.
>> +        opts = hd_opts(0)
>>           opts['backing'] = None
>> -        self.reopen(opts)
>> +        self.reopen(opts, {},
>> +                    "Cannot change backing link if 'hd0' has an implicit backing file")
> 
> Does “has an implicit backing file” mean that hd0 has an implicit node
> (the COR filter) as its backing file?  And then reopening isn’t allowed
> because the user supposedly doesn’t know about that implicit node?  If
> so, makes sense.

Yes, it is.

Andrey

> 
> Max
> 
>>   
>>           self.vm.run_job('stream0', auto_finalize = False, auto_dismiss = True)
>>   
>>
> 
>
Andrey Shinkevich Oct. 16, 2020, 3:06 p.m. UTC | #3
On 15.10.2020 20:16, Andrey Shinkevich wrote:
> On 14.10.2020 19:24, Max Reitz wrote:
>> On 12.10.20 19:43, Andrey Shinkevich wrote:
> 
> [...]
> 
>>> ---
>>>   block/stream.c             | 93 
>>> +++++++++++++++++++++++++++++-----------------
>>>   tests/qemu-iotests/030     | 51 +++----------------------
>>>   tests/qemu-iotests/030.out |  4 +-
>>>   tests/qemu-iotests/141.out |  2 +-
>>>   tests/qemu-iotests/245     | 19 +++++++---
>>>   5 files changed, 81 insertions(+), 88 deletions(-)
>>
>> Looks like stream_run() could be a bit streamlined now (the allocation
>> checking should be unnecessary, unconditionally calling
>> stream_populate() should be sufficient), but not necessary now.
>>
> 
> That is what I had kept in my mind when I tackled this patch. But there 
> is an underwater reef to streamline. Namely, how the block-stream job 
> gets known about a long unallocated tail to exit the loop earlier in the 
> stream_run(). Shall we return the '-EOF' or another error code from the 
> cor_co_preadv_part() to be handled by the stream_run()? Any other 
> suggestions, if any, will be appreciated.
> 
>>> diff --git a/block/stream.c b/block/stream.c
>>> index d3e1812..93564db 100644
>>> --- a/block/stream.c
>>> +++ b/block/stream.c
>>
>> [...]
> 
>>> +
>>> +    cor_filter_bs = bdrv_cor_filter_append(bs, opts, BDRV_O_RDWR, 
>>> errp);
>>> +    if (cor_filter_bs == NULL) {
>>> +        goto fail;
>>> +    }
>>> +
>>> +    if (bdrv_freeze_backing_chain(cor_filter_bs, bs, errp) < 0) {
>>
>> Is there a reason why we can’t combine this with the
>> bdrv_free_backing_chain() from bs down to above_base?  I mean, the
>> effect should be the same, just asking.
>>
> 
> The bdrv_freeze_backing_chain(bs, above_base, errp) is called before the 
> bdrv_reopen_set_read_only() to keep the backing chain safe during the 
> context switch. Then we will want to freeze the 'COR -> TOP BS' link as 
> well. Freezing/unfreezing parts is simlier to manage than doing that 
> with the whole chain.
> If we decide to invoke the bdrv_reopen_set_read_only() after freezing 
> the backing chain together with the COR-filter, we will not be able to 
> get the 'write' permission on the read-only node.
> 
> 
>>> +        bdrv_cor_filter_drop(cor_filter_bs);
>>> +        cor_filter_bs = NULL;
>>> +        goto fail;
>>> +    }
>>> +
>>> +    s = block_job_create(job_id, &stream_job_driver, NULL, 
>>> cor_filter_bs,
>>> +                         BLK_PERM_CONSISTENT_READ,
>>> +                         basic_flags | BLK_PERM_WRITE | 
>>> BLK_PERM_GRAPH_MOD,
>>
>> Not that I’m an expert on the GRAPH_MOD permission, but why is this
>> shared here but not below?  Shouldn’t it be the same in both cases?
>> (Same for taking it as a permission.)
>>
> 
> When we invoke the block_job_add_bdrv(&s->common, "active node", bs,..) 
> below (particularly, we need it to block the operations on the top node, 
> bdrv_op_block_all()), we ask for the GRAPH_MOD permission for the top 
> node. To allow that, the parent filter node should share that permission 
> for the underlying node. Otherwise, we get assertion failed in the 
> bdrv_check_update_perm() called from bdrv_replace_node() when we remove 
> the filter.
> 

I will add my comments above to the code.

Andrey


[...]
Vladimir Sementsov-Ogievskiy Oct. 16, 2020, 3:45 p.m. UTC | #4
15.10.2020 20:16, Andrey Shinkevich wrote:
> On 14.10.2020 19:24, Max Reitz wrote:
>> On 12.10.20 19:43, Andrey Shinkevich wrote:
> 
> [...]
> 
>>> ---
>>>   block/stream.c             | 93 +++++++++++++++++++++++++++++-----------------
>>>   tests/qemu-iotests/030     | 51 +++----------------------
>>>   tests/qemu-iotests/030.out |  4 +-
>>>   tests/qemu-iotests/141.out |  2 +-
>>>   tests/qemu-iotests/245     | 19 +++++++---
>>>   5 files changed, 81 insertions(+), 88 deletions(-)
>>
>> Looks like stream_run() could be a bit streamlined now (the allocation
>> checking should be unnecessary, unconditionally calling
>> stream_populate() should be sufficient), but not necessary now.
>>
> 
> That is what I had kept in my mind when I tackled this patch. But there is an underwater reef to streamline. Namely, how the block-stream job gets known about a long unallocated tail to exit the loop earlier in the stream_run(). Shall we return the '-EOF' or another error code from the cor_co_preadv_part() to be handled by the stream_run()? Any other suggestions, if any, will be appreciated.

Just calling read CHUNK by CHUNK may be less efficient than is_allocated()-driven loop: you may end up with splitting regions unaligned to CHUNK-granularity, which would not be splitted with is_allocated()-driven loop. Current loop allows chunks unaligned to CHUNK.

So, I think, it's better to keep is_allocated() logic as is for now.
Andrey Shinkevich Oct. 20, 2020, 7:43 p.m. UTC | #5
On 16.10.2020 18:45, Vladimir Sementsov-Ogievskiy wrote:
> 15.10.2020 20:16, Andrey Shinkevich wrote:
>> On 14.10.2020 19:24, Max Reitz wrote:
>>> On 12.10.20 19:43, Andrey Shinkevich wrote:
>>
>> [...]
>>
>>>> ---
>>>>   block/stream.c             | 93 
>>>> +++++++++++++++++++++++++++++-----------------
>>>>   tests/qemu-iotests/030     | 51 +++----------------------
>>>>   tests/qemu-iotests/030.out |  4 +-
>>>>   tests/qemu-iotests/141.out |  2 +-
>>>>   tests/qemu-iotests/245     | 19 +++++++---
>>>>   5 files changed, 81 insertions(+), 88 deletions(-)
>>>
>>> Looks like stream_run() could be a bit streamlined now (the allocation
>>> checking should be unnecessary, unconditionally calling
>>> stream_populate() should be sufficient), but not necessary now.
>>>
>>
>> That is what I had kept in my mind when I tackled this patch. But 
>> there is an underwater reef to streamline. Namely, how the 
>> block-stream job gets known about a long unallocated tail to exit the 
>> loop earlier in the stream_run(). Shall we return the '-EOF' or 
>> another error code from the cor_co_preadv_part() to be handled by the 
>> stream_run()? Any other suggestions, if any, will be appreciated.
> 
> Just calling read CHUNK by CHUNK may be less efficient than 
> is_allocated()-driven loop: you may end up with splitting regions 
> unaligned to CHUNK-granularity, which would not be splitted with 
> is_allocated()-driven loop. Current loop allows chunks unaligned to CHUNK.

The cor_co_preadv_part() will check for the end of a file in the next 
version. So, the unalignment is not going to be the issue.

Andrey

> 
> So, I think, it's better to keep is_allocated() logic as is for now.
> 
> 
>
diff mbox series

Patch

diff --git a/block/stream.c b/block/stream.c
index d3e1812..93564db 100644
--- a/block/stream.c
+++ b/block/stream.c
@@ -17,8 +17,10 @@ 
 #include "block/blockjob_int.h"
 #include "qapi/error.h"
 #include "qapi/qmp/qerror.h"
+#include "qapi/qmp/qdict.h"
 #include "qemu/ratelimit.h"
 #include "sysemu/block-backend.h"
+#include "block/copy-on-read.h"
 
 enum {
     /*
@@ -33,6 +35,8 @@  typedef struct StreamBlockJob {
     BlockJob common;
     BlockDriverState *base_overlay; /* COW overlay (stream from this) */
     BlockDriverState *above_base;   /* Node directly above the base */
+    BlockDriverState *cor_filter_bs;
+    BlockDriverState *target_bs;
     BlockdevOnError on_error;
     bool bs_read_only;
     bool chain_frozen;
@@ -43,8 +47,7 @@  static int coroutine_fn stream_populate(BlockBackend *blk,
 {
     assert(bytes < SIZE_MAX);
 
-    return blk_co_preadv(blk, offset, bytes, NULL,
-                         BDRV_REQ_COPY_ON_READ | BDRV_REQ_PREFETCH);
+    return blk_co_preadv(blk, offset, bytes, NULL, BDRV_REQ_PREFETCH);
 }
 
 static void stream_abort(Job *job)
@@ -52,23 +55,20 @@  static void stream_abort(Job *job)
     StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
 
     if (s->chain_frozen) {
-        BlockJob *bjob = &s->common;
-        bdrv_unfreeze_backing_chain(blk_bs(bjob->blk), s->above_base);
+        bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
     }
 }
 
 static int stream_prepare(Job *job)
 {
     StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
-    BlockJob *bjob = &s->common;
-    BlockDriverState *bs = blk_bs(bjob->blk);
-    BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
+    BlockDriverState *unfiltered_bs = bdrv_skip_filters(s->target_bs);
     BlockDriverState *base = bdrv_filter_or_cow_bs(s->above_base);
     BlockDriverState *base_unfiltered = bdrv_skip_filters(base);
     Error *local_err = NULL;
     int ret = 0;
 
-    bdrv_unfreeze_backing_chain(bs, s->above_base);
+    bdrv_unfreeze_backing_chain(s->cor_filter_bs, s->above_base);
     s->chain_frozen = false;
 
     if (bdrv_cow_child(unfiltered_bs)) {
@@ -94,13 +94,14 @@  static void stream_clean(Job *job)
 {
     StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
     BlockJob *bjob = &s->common;
-    BlockDriverState *bs = blk_bs(bjob->blk);
+
+    bdrv_cor_filter_drop(s->cor_filter_bs);
 
     /* Reopen the image back in read-only mode if necessary */
     if (s->bs_read_only) {
         /* Give up write permissions before making it read-only */
         blk_set_perm(bjob->blk, 0, BLK_PERM_ALL, &error_abort);
-        bdrv_reopen_set_read_only(bs, true, NULL);
+        bdrv_reopen_set_read_only(s->target_bs, true, NULL);
     }
 }
 
@@ -108,9 +109,7 @@  static int coroutine_fn stream_run(Job *job, Error **errp)
 {
     StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
     BlockBackend *blk = s->common.blk;
-    BlockDriverState *bs = blk_bs(blk);
-    BlockDriverState *unfiltered_bs = bdrv_skip_filters(bs);
-    bool enable_cor = !bdrv_cow_child(s->base_overlay);
+    BlockDriverState *unfiltered_bs = bdrv_skip_filters(s->target_bs);
     int64_t len;
     int64_t offset = 0;
     uint64_t delay_ns = 0;
@@ -122,21 +121,12 @@  static int coroutine_fn stream_run(Job *job, Error **errp)
         return 0;
     }
 
-    len = bdrv_getlength(bs);
+    len = bdrv_getlength(s->target_bs);
     if (len < 0) {
         return len;
     }
     job_progress_set_remaining(&s->common.job, len);
 
-    /* Turn on copy-on-read for the whole block device so that guest read
-     * requests help us make progress.  Only do this when copying the entire
-     * backing chain since the copy-on-read operation does not take base into
-     * account.
-     */
-    if (enable_cor) {
-        bdrv_enable_copy_on_read(bs);
-    }
-
     for ( ; offset < len; offset += n) {
         bool copy;
         int ret;
@@ -195,10 +185,6 @@  static int coroutine_fn stream_run(Job *job, Error **errp)
         }
     }
 
-    if (enable_cor) {
-        bdrv_disable_copy_on_read(bs);
-    }
-
     /* Do not remove the backing file if an error was there but ignored. */
     return error;
 }
@@ -228,6 +214,7 @@  void stream_start(const char *job_id, BlockDriverState *bs,
     bool bs_read_only;
     int basic_flags = BLK_PERM_CONSISTENT_READ | BLK_PERM_WRITE_UNCHANGED;
     BlockDriverState *base_overlay = bdrv_find_overlay(bs, base);
+    BlockDriverState *cor_filter_bs = NULL;
     BlockDriverState *above_base;
 
     if (!base_overlay) {
@@ -262,17 +249,48 @@  void stream_start(const char *job_id, BlockDriverState *bs,
         }
     }
 
-    /* Prevent concurrent jobs trying to modify the graph structure here, we
-     * already have our own plans. Also don't allow resize as the image size is
-     * queried only at the job start and then cached. */
-    s = block_job_create(job_id, &stream_job_driver, NULL, bs,
-                         basic_flags | BLK_PERM_GRAPH_MOD,
-                         basic_flags | BLK_PERM_WRITE,
+    QDict *opts = qdict_new();
+
+    qdict_put_str(opts, "driver", "copy-on-read");
+    qdict_put_str(opts, "file", bdrv_get_node_name(bs));
+    if (base_overlay) {
+        /* Pass the base_overlay rather than base */
+        qdict_put_str(opts, "base", base_overlay->node_name);
+    }
+    if (filter_node_name) {
+        qdict_put_str(opts, "node-name", filter_node_name);
+    }
+
+    cor_filter_bs = bdrv_cor_filter_append(bs, opts, BDRV_O_RDWR, errp);
+    if (cor_filter_bs == NULL) {
+        goto fail;
+    }
+
+    if (bdrv_freeze_backing_chain(cor_filter_bs, bs, errp) < 0) {
+        bdrv_cor_filter_drop(cor_filter_bs);
+        cor_filter_bs = NULL;
+        goto fail;
+    }
+
+    s = block_job_create(job_id, &stream_job_driver, NULL, cor_filter_bs,
+                         BLK_PERM_CONSISTENT_READ,
+                         basic_flags | BLK_PERM_WRITE | BLK_PERM_GRAPH_MOD,
                          speed, creation_flags, NULL, NULL, errp);
     if (!s) {
         goto fail;
     }
 
+    /*
+     * Prevent concurrent jobs trying to modify the graph structure here, we
+     * already have our own plans. Also don't allow resize as the image size is
+     * queried only at the job start and then cached.
+     */
+    if (block_job_add_bdrv(&s->common, "active node", bs,
+                           basic_flags | BLK_PERM_GRAPH_MOD,
+                           basic_flags | BLK_PERM_WRITE, &error_abort)) {
+        goto fail;
+    }
+
     /* Block all intermediate nodes between bs and base, because they will
      * disappear from the chain after this operation. The streaming job reads
      * every block only once, assuming that it doesn't change, so forbid writes
@@ -292,6 +310,8 @@  void stream_start(const char *job_id, BlockDriverState *bs,
 
     s->base_overlay = base_overlay;
     s->above_base = above_base;
+    s->cor_filter_bs = cor_filter_bs;
+    s->target_bs = bs;
     s->bs_read_only = bs_read_only;
     s->chain_frozen = true;
 
@@ -304,5 +324,10 @@  fail:
     if (bs_read_only) {
         bdrv_reopen_set_read_only(bs, true, NULL);
     }
-    bdrv_unfreeze_backing_chain(bs, above_base);
+    if (cor_filter_bs) {
+        bdrv_unfreeze_backing_chain(cor_filter_bs, above_base);
+        bdrv_cor_filter_drop(cor_filter_bs);
+    } else {
+        bdrv_unfreeze_backing_chain(bs, above_base);
+    }
 }
diff --git a/tests/qemu-iotests/030 b/tests/qemu-iotests/030
index dcb4b5d..0064590 100755
--- a/tests/qemu-iotests/030
+++ b/tests/qemu-iotests/030
@@ -227,61 +227,20 @@  class TestParallelOps(iotests.QMPTestCase):
         for img in self.imgs:
             os.remove(img)
 
-    # Test that it's possible to run several block-stream operations
-    # in parallel in the same snapshot chain
-    @unittest.skipIf(os.environ.get('QEMU_CHECK_BLOCK_AUTO'), 'disabled in CI')
-    def test_stream_parallel(self):
-        self.assert_no_active_block_jobs()
-
-        # Check that the maps don't match before the streaming operations
-        for i in range(2, self.num_imgs, 2):
-            self.assertNotEqual(qemu_io('-f', iotests.imgfmt, '-rU', '-c', 'map', self.imgs[i]),
-                                qemu_io('-f', iotests.imgfmt, '-rU', '-c', 'map', self.imgs[i-1]),
-                                'image file map matches backing file before streaming')
-
-        # Create all streaming jobs
-        pending_jobs = []
-        for i in range(2, self.num_imgs, 2):
-            node_name = 'node%d' % i
-            job_id = 'stream-%s' % node_name
-            pending_jobs.append(job_id)
-            result = self.vm.qmp('block-stream', device=node_name, job_id=job_id, base=self.imgs[i-2], speed=1024)
-            self.assert_qmp(result, 'return', {})
-
-        for job in pending_jobs:
-            result = self.vm.qmp('block-job-set-speed', device=job, speed=0)
-            self.assert_qmp(result, 'return', {})
-
-        # Wait for all jobs to be finished.
-        while len(pending_jobs) > 0:
-            for event in self.vm.get_qmp_events(wait=True):
-                if event['event'] == 'BLOCK_JOB_COMPLETED':
-                    job_id = self.dictpath(event, 'data/device')
-                    self.assertTrue(job_id in pending_jobs)
-                    self.assert_qmp_absent(event, 'data/error')
-                    pending_jobs.remove(job_id)
-
-        self.assert_no_active_block_jobs()
-        self.vm.shutdown()
-
-        # Check that all maps match now
-        for i in range(2, self.num_imgs, 2):
-            self.assertEqual(qemu_io('-f', iotests.imgfmt, '-c', 'map', self.imgs[i]),
-                             qemu_io('-f', iotests.imgfmt, '-c', 'map', self.imgs[i-1]),
-                             'image file map does not match backing file after streaming')
-
     # Test that it's not possible to perform two block-stream
     # operations if there are nodes involved in both.
     def test_overlapping_1(self):
         self.assert_no_active_block_jobs()
 
         # Set a speed limit to make sure that this job blocks the rest
-        result = self.vm.qmp('block-stream', device='node4', job_id='stream-node4', base=self.imgs[1], speed=1024*1024)
+        result = self.vm.qmp('block-stream', device='node4',
+                             job_id='stream-node4', base=self.imgs[1],
+                             filter_node_name='stream-filter', speed=1024*1024)
         self.assert_qmp(result, 'return', {})
 
         result = self.vm.qmp('block-stream', device='node5', job_id='stream-node5', base=self.imgs[2])
         self.assert_qmp(result, 'error/desc',
-            "Node 'node4' is busy: block device is in use by block job: stream")
+            "Node 'stream-filter' is busy: block device is in use by block job: stream")
 
         result = self.vm.qmp('block-stream', device='node3', job_id='stream-node3', base=self.imgs[2])
         self.assert_qmp(result, 'error/desc',
@@ -294,7 +253,7 @@  class TestParallelOps(iotests.QMPTestCase):
         # block-commit should also fail if it touches nodes used by the stream job
         result = self.vm.qmp('block-commit', device='drive0', base=self.imgs[4], job_id='commit-node4')
         self.assert_qmp(result, 'error/desc',
-            "Node 'node4' is busy: block device is in use by block job: stream")
+            "Node 'stream-filter' is busy: block device is in use by block job: stream")
 
         result = self.vm.qmp('block-commit', device='drive0', base=self.imgs[1], top=self.imgs[3], job_id='commit-node1')
         self.assert_qmp(result, 'error/desc',
diff --git a/tests/qemu-iotests/030.out b/tests/qemu-iotests/030.out
index 6d9bee1..5eb508d 100644
--- a/tests/qemu-iotests/030.out
+++ b/tests/qemu-iotests/030.out
@@ -1,5 +1,5 @@ 
-...........................
+..........................
 ----------------------------------------------------------------------
-Ran 27 tests
+Ran 26 tests
 
 OK
diff --git a/tests/qemu-iotests/141.out b/tests/qemu-iotests/141.out
index 08e0aec..028a16f 100644
--- a/tests/qemu-iotests/141.out
+++ b/tests/qemu-iotests/141.out
@@ -99,7 +99,7 @@  wrote 1048576/1048576 bytes at offset 0
 {"timestamp": {"seconds":  TIMESTAMP, "microseconds":  TIMESTAMP}, "event": "JOB_STATUS_CHANGE", "data": {"status": "created", "id": "job0"}}
 {"timestamp": {"seconds":  TIMESTAMP, "microseconds":  TIMESTAMP}, "event": "JOB_STATUS_CHANGE", "data": {"status": "running", "id": "job0"}}
 {'execute': 'blockdev-del', 'arguments': {'node-name': 'drv0'}}
-{"error": {"class": "GenericError", "desc": "Node drv0 is in use"}}
+{"error": {"class": "GenericError", "desc": "Node 'drv0' is busy: block device is in use by block job: stream"}}
 {'execute': 'block-job-cancel', 'arguments': {'device': 'job0'}}
 {"return": {}}
 {"timestamp": {"seconds":  TIMESTAMP, "microseconds":  TIMESTAMP}, "event": "JOB_STATUS_CHANGE", "data": {"status": "aborting", "id": "job0"}}
diff --git a/tests/qemu-iotests/245 b/tests/qemu-iotests/245
index e60c832..940e85a 100755
--- a/tests/qemu-iotests/245
+++ b/tests/qemu-iotests/245
@@ -899,17 +899,26 @@  class TestBlockdevReopen(iotests.QMPTestCase):
         # make hd1 read-only and block-stream requires it to be read-write
         # (Which error message appears depends on whether the stream job is
         # already done with copying at this point.)
-        self.reopen(opts, {},
+        # As the COR-filter node is inserted into the backing chain with the
+        # 'block-stream' operation, we move the options to their proper nodes.
+        opts = hd_opts(1)
+        opts['backing'] = hd_opts(2)
+        opts['backing']['backing'] = None
+        self.reopen(opts, {'read-only': True},
             ["Can't set node 'hd1' to r/o with copy-on-read enabled",
              "Cannot make block node read-only, there is a writer on it"])
 
         # We can't remove hd2 while the stream job is ongoing
-        opts['backing']['backing'] = None
-        self.reopen(opts, {'backing.read-only': False}, "Cannot change 'backing' link from 'hd1' to 'hd2'")
+        opts['backing'] = None
+        self.reopen(opts, {'read-only': False},
+                    "Cannot change 'backing' link from 'hd1' to 'hd2'")
 
-        # We can detach hd1 from hd0 because it doesn't affect the stream job
+        # We can't detach hd1 from hd0 because there is the COR-filter implicit
+        # node in between.
+        opts = hd_opts(0)
         opts['backing'] = None
-        self.reopen(opts)
+        self.reopen(opts, {},
+                    "Cannot change backing link if 'hd0' has an implicit backing file")
 
         self.vm.run_job('stream0', auto_finalize = False, auto_dismiss = True)