diff mbox

[3/8] migration: support to detect compression and decompression errors

Message ID 20180313075739.11194-4-xiaoguangrong@tencent.com (mailing list archive)
State New, archived
Headers show

Commit Message

Xiao Guangrong March 13, 2018, 7:57 a.m. UTC
From: Xiao Guangrong <xiaoguangrong@tencent.com>

Currently the page being compressed is allowed to be updated by
the VM on the source QEMU, correspondingly the destination QEMU
just ignores the decompression error. However, we completely miss
the chance to catch real errors, then the VM is corrupted silently

To make the migration more robuster, we copy the page to a buffer
first to avoid it being written by VM, then detect and handle the
errors of both compression and decompression errors properly

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/qemu-file.c |  4 ++--
 migration/ram.c       | 29 +++++++++++++++++++----------
 2 files changed, 21 insertions(+), 12 deletions(-)

Comments

Dr. David Alan Gilbert March 15, 2018, 11:29 a.m. UTC | #1
* guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> Currently the page being compressed is allowed to be updated by
> the VM on the source QEMU, correspondingly the destination QEMU
> just ignores the decompression error. However, we completely miss
> the chance to catch real errors, then the VM is corrupted silently
> 
> To make the migration more robuster, we copy the page to a buffer
> first to avoid it being written by VM, then detect and handle the
> errors of both compression and decompression errors properly
> 
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> ---
>  migration/qemu-file.c |  4 ++--
>  migration/ram.c       | 29 +++++++++++++++++++----------
>  2 files changed, 21 insertions(+), 12 deletions(-)
> 
> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> index 1ff33a1ffb..137bcc8bdc 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -711,9 +711,9 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
>      blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
>                                blen, p, size);
>      if (blen < 0) {
> -        error_report("Compress Failed!");
> -        return 0;
> +        return -1;
>      }
> +
>      qemu_put_be32(f, blen);
>      if (f->ops->writev_buffer) {
>          add_to_iovec(f, f->buf + f->buf_index, blen, false);
> diff --git a/migration/ram.c b/migration/ram.c
> index fff3f31e90..c47185d38c 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -273,6 +273,7 @@ struct DecompressParam {
>      bool quit;
>      QemuMutex mutex;
>      QemuCond cond;
> +    QEMUFile *file;
>      void *des;
>      uint8_t *compbuf;
>      int len;
> @@ -1051,11 +1052,13 @@ static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
>  {
>      RAMState *rs = ram_state;
>      int bytes_sent, blen;
> -    uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
> +    uint8_t buf[TARGET_PAGE_SIZE], *p;

That should be malloc'd somewhere rather than be on the stack; it's a
bit big and also there are architectures where TARGET_PAGE_SIZE isn't
compile time constant.

(Also, please use g_try_malloc rather than g_malloc on larger chunks,
since g_try_malloc will return NULL so you can fail nicely;  g_malloc is
OK for small things that are very unlikely to fail).

Other than that, I think the patch is fine.

Dave

> +    p = block->host + (offset & TARGET_PAGE_MASK);
>      bytes_sent = save_page_header(rs, f, block, offset |
>                                    RAM_SAVE_FLAG_COMPRESS_PAGE);
> -    blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE);
> +    memcpy(buf, p, TARGET_PAGE_SIZE);
> +    blen = qemu_put_compression_data(f, stream, buf, TARGET_PAGE_SIZE);
>      if (blen < 0) {
>          bytes_sent = 0;
>          qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
> @@ -2547,7 +2550,7 @@ static void *do_data_decompress(void *opaque)
>      DecompressParam *param = opaque;
>      unsigned long pagesize;
>      uint8_t *des;
> -    int len;
> +    int len, ret;
>  
>      qemu_mutex_lock(&param->mutex);
>      while (!param->quit) {
> @@ -2563,8 +2566,12 @@ static void *do_data_decompress(void *opaque)
>               * not a problem because the dirty page will be retransferred
>               * and uncompress() won't break the data in other pages.
>               */
> -            qemu_uncompress(&param->stream, des, pagesize,
> -                            param->compbuf, len);
> +            ret = qemu_uncompress(&param->stream, des, pagesize,
> +                                  param->compbuf, len);
> +            if (ret < 0) {
> +                error_report("decompress data failed");
> +                qemu_file_set_error(param->file, ret);
> +            }
>  
>              qemu_mutex_lock(&decomp_done_lock);
>              param->done = true;
> @@ -2581,12 +2588,12 @@ static void *do_data_decompress(void *opaque)
>      return NULL;
>  }
>  
> -static void wait_for_decompress_done(void)
> +static int wait_for_decompress_done(QEMUFile *f)
>  {
>      int idx, thread_count;
>  
>      if (!migrate_use_compression()) {
> -        return;
> +        return 0;
>      }
>  
>      thread_count = migrate_decompress_threads();
> @@ -2597,6 +2604,7 @@ static void wait_for_decompress_done(void)
>          }
>      }
>      qemu_mutex_unlock(&decomp_done_lock);
> +    return qemu_file_get_error(f);
>  }
>  
>  static void compress_threads_load_cleanup(void)
> @@ -2635,7 +2643,7 @@ static void compress_threads_load_cleanup(void)
>      decomp_param = NULL;
>  }
>  
> -static int compress_threads_load_setup(void)
> +static int compress_threads_load_setup(QEMUFile *f)
>  {
>      int i, thread_count;
>  
> @@ -2654,6 +2662,7 @@ static int compress_threads_load_setup(void)
>          }
>          decomp_param[i].stream.opaque = &decomp_param[i];
>  
> +        decomp_param[i].file = f;
>          qemu_mutex_init(&decomp_param[i].mutex);
>          qemu_cond_init(&decomp_param[i].cond);
>          decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
> @@ -2708,7 +2717,7 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
>   */
>  static int ram_load_setup(QEMUFile *f, void *opaque)
>  {
> -    if (compress_threads_load_setup()) {
> +    if (compress_threads_load_setup(f)) {
>          return -1;
>      }
>  
> @@ -3063,7 +3072,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          }
>      }
>  
> -    wait_for_decompress_done();
> +    ret |= wait_for_decompress_done(f);
>      rcu_read_unlock();
>      trace_ram_load_complete(ret, seq_iter);
>      return ret;
> -- 
> 2.14.3
> 
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Xiao Guangrong March 16, 2018, 8:25 a.m. UTC | #2
On 03/15/2018 07:29 PM, Dr. David Alan Gilbert wrote:

>> @@ -1051,11 +1052,13 @@ static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
>>   {
>>       RAMState *rs = ram_state;
>>       int bytes_sent, blen;
>> -    uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
>> +    uint8_t buf[TARGET_PAGE_SIZE], *p;
> 
> That should be malloc'd somewhere rather than be on the stack; it's a
> bit big and also there are architectures where TARGET_PAGE_SIZE isn't
> compile time constant.
> 

Okay, i will allocate a internal buffer for each thread...

> (Also, please use g_try_malloc rather than g_malloc on larger chunks,
> since g_try_malloc will return NULL so you can fail nicely;  g_malloc is
> OK for small things that are very unlikely to fail).
> 
> Other than that, I think the patch is fine.

Thank you, Dave!
Peter Xu March 21, 2018, 10 a.m. UTC | #3
On Tue, Mar 13, 2018 at 03:57:34PM +0800, guangrong.xiao@gmail.com wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> 
> Currently the page being compressed is allowed to be updated by
> the VM on the source QEMU, correspondingly the destination QEMU
> just ignores the decompression error. However, we completely miss
> the chance to catch real errors, then the VM is corrupted silently
> 
> To make the migration more robuster, we copy the page to a buffer
> first to avoid it being written by VM, then detect and handle the
> errors of both compression and decompression errors properly

Not sure I missed anything important, but I'll just shoot my thoughts
as questions (again)...

Actually this is a more general question? Say, even without
compression, we can be sending a page that is being modified.

However, IMHO we don't need to worry that, since if that page is
modified, we'll definitely send that page again, so the new page will
replace the old.  So on destination side, even if decompress() failed
on a page it'll be fine IMHO.  Though now we are copying the corrupted
buffer.  On that point, I fully agree that we should not - maybe we
can just drop the page entirely?

For non-compress pages, we can't detect that, so we'll copy the page
even if corrupted.

The special part for compression would be: would the deflate() fail if
there is concurrent update to the buffer being compressed?  And would
that corrupt the whole compression stream, or it would only fail the
deflate() call?

Thanks,

> 
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> ---
>  migration/qemu-file.c |  4 ++--
>  migration/ram.c       | 29 +++++++++++++++++++----------
>  2 files changed, 21 insertions(+), 12 deletions(-)
> 
> diff --git a/migration/qemu-file.c b/migration/qemu-file.c
> index 1ff33a1ffb..137bcc8bdc 100644
> --- a/migration/qemu-file.c
> +++ b/migration/qemu-file.c
> @@ -711,9 +711,9 @@ ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
>      blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
>                                blen, p, size);
>      if (blen < 0) {
> -        error_report("Compress Failed!");
> -        return 0;
> +        return -1;
>      }
> +
>      qemu_put_be32(f, blen);
>      if (f->ops->writev_buffer) {
>          add_to_iovec(f, f->buf + f->buf_index, blen, false);
> diff --git a/migration/ram.c b/migration/ram.c
> index fff3f31e90..c47185d38c 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -273,6 +273,7 @@ struct DecompressParam {
>      bool quit;
>      QemuMutex mutex;
>      QemuCond cond;
> +    QEMUFile *file;
>      void *des;
>      uint8_t *compbuf;
>      int len;
> @@ -1051,11 +1052,13 @@ static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
>  {
>      RAMState *rs = ram_state;
>      int bytes_sent, blen;
> -    uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
> +    uint8_t buf[TARGET_PAGE_SIZE], *p;
>  
> +    p = block->host + (offset & TARGET_PAGE_MASK);
>      bytes_sent = save_page_header(rs, f, block, offset |
>                                    RAM_SAVE_FLAG_COMPRESS_PAGE);
> -    blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE);
> +    memcpy(buf, p, TARGET_PAGE_SIZE);
> +    blen = qemu_put_compression_data(f, stream, buf, TARGET_PAGE_SIZE);
>      if (blen < 0) {
>          bytes_sent = 0;
>          qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
> @@ -2547,7 +2550,7 @@ static void *do_data_decompress(void *opaque)
>      DecompressParam *param = opaque;
>      unsigned long pagesize;
>      uint8_t *des;
> -    int len;
> +    int len, ret;
>  
>      qemu_mutex_lock(&param->mutex);
>      while (!param->quit) {
> @@ -2563,8 +2566,12 @@ static void *do_data_decompress(void *opaque)
>               * not a problem because the dirty page will be retransferred
>               * and uncompress() won't break the data in other pages.
>               */
> -            qemu_uncompress(&param->stream, des, pagesize,
> -                            param->compbuf, len);
> +            ret = qemu_uncompress(&param->stream, des, pagesize,
> +                                  param->compbuf, len);
> +            if (ret < 0) {
> +                error_report("decompress data failed");
> +                qemu_file_set_error(param->file, ret);
> +            }
>  
>              qemu_mutex_lock(&decomp_done_lock);
>              param->done = true;
> @@ -2581,12 +2588,12 @@ static void *do_data_decompress(void *opaque)
>      return NULL;
>  }
>  
> -static void wait_for_decompress_done(void)
> +static int wait_for_decompress_done(QEMUFile *f)
>  {
>      int idx, thread_count;
>  
>      if (!migrate_use_compression()) {
> -        return;
> +        return 0;
>      }
>  
>      thread_count = migrate_decompress_threads();
> @@ -2597,6 +2604,7 @@ static void wait_for_decompress_done(void)
>          }
>      }
>      qemu_mutex_unlock(&decomp_done_lock);
> +    return qemu_file_get_error(f);
>  }
>  
>  static void compress_threads_load_cleanup(void)
> @@ -2635,7 +2643,7 @@ static void compress_threads_load_cleanup(void)
>      decomp_param = NULL;
>  }
>  
> -static int compress_threads_load_setup(void)
> +static int compress_threads_load_setup(QEMUFile *f)
>  {
>      int i, thread_count;
>  
> @@ -2654,6 +2662,7 @@ static int compress_threads_load_setup(void)
>          }
>          decomp_param[i].stream.opaque = &decomp_param[i];
>  
> +        decomp_param[i].file = f;
>          qemu_mutex_init(&decomp_param[i].mutex);
>          qemu_cond_init(&decomp_param[i].cond);
>          decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
> @@ -2708,7 +2717,7 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
>   */
>  static int ram_load_setup(QEMUFile *f, void *opaque)
>  {
> -    if (compress_threads_load_setup()) {
> +    if (compress_threads_load_setup(f)) {
>          return -1;
>      }
>  
> @@ -3063,7 +3072,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
>          }
>      }
>  
> -    wait_for_decompress_done();
> +    ret |= wait_for_decompress_done(f);
>      rcu_read_unlock();
>      trace_ram_load_complete(ret, seq_iter);
>      return ret;
> -- 
> 2.14.3
> 
>
Xiao Guangrong March 22, 2018, 12:03 p.m. UTC | #4
On 03/21/2018 06:00 PM, Peter Xu wrote:
> On Tue, Mar 13, 2018 at 03:57:34PM +0800, guangrong.xiao@gmail.com wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> Currently the page being compressed is allowed to be updated by
>> the VM on the source QEMU, correspondingly the destination QEMU
>> just ignores the decompression error. However, we completely miss
>> the chance to catch real errors, then the VM is corrupted silently
>>
>> To make the migration more robuster, we copy the page to a buffer
>> first to avoid it being written by VM, then detect and handle the
>> errors of both compression and decompression errors properly
> 
> Not sure I missed anything important, but I'll just shoot my thoughts
> as questions (again)...
> 
> Actually this is a more general question? Say, even without
> compression, we can be sending a page that is being modified.
> 
> However, IMHO we don't need to worry that, since if that page is
> modified, we'll definitely send that page again, so the new page will
> replace the old.  So on destination side, even if decompress() failed
> on a page it'll be fine IMHO.  Though now we are copying the corrupted
> buffer.  On that point, I fully agree that we should not - maybe we
> can just drop the page entirely?
> 
> For non-compress pages, we can't detect that, so we'll copy the page
> even if corrupted.
> 
> The special part for compression would be: would the deflate() fail if
> there is concurrent update to the buffer being compressed?  And would
> that corrupt the whole compression stream, or it would only fail the
> deflate() call?

It is not the same for normal page and compressed page.

For the normal page, the dirty-log mechanism in QEMU and the infrastructure
of the network (e.g, TCP) can make sure that the modified memory will
be posted to the destination without corruption.

However, nothing can guarantee compression/decompression is BUG-free,
e,g, consider the case, in the last step, vCPUs & dirty-log are paused and
the memory is compressed and posted to destination, if there is any error
in compression/decompression, VM dies silently.
Xiao Guangrong March 26, 2018, 7:42 p.m. UTC | #5
On 03/27/2018 03:22 PM, Peter Xu wrote:
> On Thu, Mar 22, 2018 at 08:03:53PM +0800, Xiao Guangrong wrote:
>>
>>
>> On 03/21/2018 06:00 PM, Peter Xu wrote:
>>> On Tue, Mar 13, 2018 at 03:57:34PM +0800, guangrong.xiao@gmail.com wrote:
>>>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>>>
>>>> Currently the page being compressed is allowed to be updated by
>>>> the VM on the source QEMU, correspondingly the destination QEMU
>>>> just ignores the decompression error. However, we completely miss
>>>> the chance to catch real errors, then the VM is corrupted silently
>>>>
>>>> To make the migration more robuster, we copy the page to a buffer
>>>> first to avoid it being written by VM, then detect and handle the
>>>> errors of both compression and decompression errors properly
>>>
>>> Not sure I missed anything important, but I'll just shoot my thoughts
>>> as questions (again)...
>>>
>>> Actually this is a more general question? Say, even without
>>> compression, we can be sending a page that is being modified.
>>>
>>> However, IMHO we don't need to worry that, since if that page is
>>> modified, we'll definitely send that page again, so the new page will
>>> replace the old.  So on destination side, even if decompress() failed
>>> on a page it'll be fine IMHO.  Though now we are copying the corrupted
>>> buffer.  On that point, I fully agree that we should not - maybe we
>>> can just drop the page entirely?
>>>
>>> For non-compress pages, we can't detect that, so we'll copy the page
>>> even if corrupted.
>>>
>>> The special part for compression would be: would the deflate() fail if
>>> there is concurrent update to the buffer being compressed?  And would
>>> that corrupt the whole compression stream, or it would only fail the
>>> deflate() call?
>>
>> It is not the same for normal page and compressed page.
>>
>> For the normal page, the dirty-log mechanism in QEMU and the infrastructure
>> of the network (e.g, TCP) can make sure that the modified memory will
>> be posted to the destination without corruption.
>>
>> However, nothing can guarantee compression/decompression is BUG-free,
>> e,g, consider the case, in the last step, vCPUs & dirty-log are paused and
>> the memory is compressed and posted to destination, if there is any error
>> in compression/decompression, VM dies silently.
> 
> Here do you mean the compression error even if the VM is halted?  I'd
> say in that case IMHO the extra memcpy() would still help little since
> the coiped page should exactly be the same as the source page?

”compression error“ means that compress2() in original code returns a
error code.

If the data being compressed is being modified at the some time,
compression will fail and this failure is negative. We move the data to
a internal buffer to avoid this case, so that we can catch the real
error condition.

> 
> I'd say I don't know what we can really do if there are zlib bugs. I
> was assuming we'll definitely fail in a strange way if there is any,
> which should be hard to be detected from QEMU's POV (maybe a
> destination VM crash, as you mentioned).  It'll be easy for us to
> detect errors when we got error code returned from compress(), however
> IMHO when we say "zlib bug" it can also mean that data is corrputed
> even compress() and decompress() both returned with good state.
> 

Ah, sorry, i abused the word "BUG".

It does not mean the BUG in compression/decompression API, i mean the
failure conditions (The API returns a error code).

> It'll be understandable to me if the problem is that the compress()
> API does not allow the input buffer to be changed during the whole
> period of the call.  If that is a must, this patch for sure helps.

Yes, that is exactly what i want to say. :)
Xiao Guangrong March 27, 2018, 1:20 a.m. UTC | #6
On 03/27/2018 07:17 PM, Peter Xu wrote:
> On Tue, Mar 27, 2018 at 03:42:32AM +0800, Xiao Guangrong wrote:
> 
> [...]
> 
>>> It'll be understandable to me if the problem is that the compress()
>>> API does not allow the input buffer to be changed during the whole
>>> period of the call.  If that is a must, this patch for sure helps.
>>
>> Yes, that is exactly what i want to say. :)
> 
> So I think now I know what this patch is for. :) And yeah, it makes
> sense.
> 
> Though another question would be: if the buffer is updated during
> compress() and compress() returned error, would that pollute the whole
> z_stream or it only fails the compress() call?
> 

I guess deflateReset() can recover everything, i.e, keep z_stream as
it is init'ed by deflate_init().

> (Same question applies to decompress().)
> 
> If it's only a compress() error and it won't pollute z_stream (or say,
> it can be recovered after a deflateReset() and then we can continue to
> call deflate() without problem), then we'll actually have two
> alternatives to solve this "buffer update" issue:
> 
> 1. Use the approach of current patch: we copy the page every time, so
>     deflate() never fails because update never happens.  But it's slow
>     since we copy the pages every time.
> 
> 2. Use the old approach, and when compress() fail, we just ignore that
>     page (since now we know that error _must_ be caused by page update,
>     then we are 100% sure that we'll send that page again so it'll be
>     perfectly fine).
> 

No, we can't make the assumption that "error _must_ be caused by page update".
No document/ABI about compress/decompress promised it. :)

Thanks!
Peter Xu March 27, 2018, 7:22 a.m. UTC | #7
On Thu, Mar 22, 2018 at 08:03:53PM +0800, Xiao Guangrong wrote:
> 
> 
> On 03/21/2018 06:00 PM, Peter Xu wrote:
> > On Tue, Mar 13, 2018 at 03:57:34PM +0800, guangrong.xiao@gmail.com wrote:
> > > From: Xiao Guangrong <xiaoguangrong@tencent.com>
> > > 
> > > Currently the page being compressed is allowed to be updated by
> > > the VM on the source QEMU, correspondingly the destination QEMU
> > > just ignores the decompression error. However, we completely miss
> > > the chance to catch real errors, then the VM is corrupted silently
> > > 
> > > To make the migration more robuster, we copy the page to a buffer
> > > first to avoid it being written by VM, then detect and handle the
> > > errors of both compression and decompression errors properly
> > 
> > Not sure I missed anything important, but I'll just shoot my thoughts
> > as questions (again)...
> > 
> > Actually this is a more general question? Say, even without
> > compression, we can be sending a page that is being modified.
> > 
> > However, IMHO we don't need to worry that, since if that page is
> > modified, we'll definitely send that page again, so the new page will
> > replace the old.  So on destination side, even if decompress() failed
> > on a page it'll be fine IMHO.  Though now we are copying the corrupted
> > buffer.  On that point, I fully agree that we should not - maybe we
> > can just drop the page entirely?
> > 
> > For non-compress pages, we can't detect that, so we'll copy the page
> > even if corrupted.
> > 
> > The special part for compression would be: would the deflate() fail if
> > there is concurrent update to the buffer being compressed?  And would
> > that corrupt the whole compression stream, or it would only fail the
> > deflate() call?
> 
> It is not the same for normal page and compressed page.
> 
> For the normal page, the dirty-log mechanism in QEMU and the infrastructure
> of the network (e.g, TCP) can make sure that the modified memory will
> be posted to the destination without corruption.
> 
> However, nothing can guarantee compression/decompression is BUG-free,
> e,g, consider the case, in the last step, vCPUs & dirty-log are paused and
> the memory is compressed and posted to destination, if there is any error
> in compression/decompression, VM dies silently.

Here do you mean the compression error even if the VM is halted?  I'd
say in that case IMHO the extra memcpy() would still help little since
the coiped page should exactly be the same as the source page?

I'd say I don't know what we can really do if there are zlib bugs. I
was assuming we'll definitely fail in a strange way if there is any,
which should be hard to be detected from QEMU's POV (maybe a
destination VM crash, as you mentioned).  It'll be easy for us to
detect errors when we got error code returned from compress(), however
IMHO when we say "zlib bug" it can also mean that data is corrputed
even compress() and decompress() both returned with good state.

It'll be understandable to me if the problem is that the compress()
API does not allow the input buffer to be changed during the whole
period of the call.  If that is a must, this patch for sure helps.

Thanks,
Peter Xu March 27, 2018, 11:17 a.m. UTC | #8
On Tue, Mar 27, 2018 at 03:42:32AM +0800, Xiao Guangrong wrote:

[...]

> > It'll be understandable to me if the problem is that the compress()
> > API does not allow the input buffer to be changed during the whole
> > period of the call.  If that is a must, this patch for sure helps.
> 
> Yes, that is exactly what i want to say. :)

So I think now I know what this patch is for. :) And yeah, it makes
sense.

Though another question would be: if the buffer is updated during
compress() and compress() returned error, would that pollute the whole
z_stream or it only fails the compress() call?

(Same question applies to decompress().)

If it's only a compress() error and it won't pollute z_stream (or say,
it can be recovered after a deflateReset() and then we can continue to
call deflate() without problem), then we'll actually have two
alternatives to solve this "buffer update" issue:

1. Use the approach of current patch: we copy the page every time, so
   deflate() never fails because update never happens.  But it's slow
   since we copy the pages every time.

2. Use the old approach, and when compress() fail, we just ignore that
   page (since now we know that error _must_ be caused by page update,
   then we are 100% sure that we'll send that page again so it'll be
   perfectly fine).

If you see, IMHO method 2 has its advantage, since actually it
"detects" the page update operation by getting a failure in
compress(), then we don't really need to send that page at all (since
we'll send it later again, for sure).  Then, we not only saved the
memcpy() CPU time for every single page, meanwhile we might save some
bandwidth since we won't bother to send the page when we know the page
is modified.

But all these depend on the assumption that:

1. compress() will fail only because of buffer update, and

2. compress() failures won't pollute the whole z_stream.

Same thing would apply to decompress() side - we drop the corrupted
page (when decompress() returned errors) since we know another one
will come soon.

It's a bit tricky, but I'm still curious about it, since actually
that's mostly the old code before this patch except that we don't
really drop corrputed pages but we still use them (which won't hurt
too IMHO).

Thanks,
diff mbox

Patch

diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index 1ff33a1ffb..137bcc8bdc 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -711,9 +711,9 @@  ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
     blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
                               blen, p, size);
     if (blen < 0) {
-        error_report("Compress Failed!");
-        return 0;
+        return -1;
     }
+
     qemu_put_be32(f, blen);
     if (f->ops->writev_buffer) {
         add_to_iovec(f, f->buf + f->buf_index, blen, false);
diff --git a/migration/ram.c b/migration/ram.c
index fff3f31e90..c47185d38c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -273,6 +273,7 @@  struct DecompressParam {
     bool quit;
     QemuMutex mutex;
     QemuCond cond;
+    QEMUFile *file;
     void *des;
     uint8_t *compbuf;
     int len;
@@ -1051,11 +1052,13 @@  static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
 {
     RAMState *rs = ram_state;
     int bytes_sent, blen;
-    uint8_t *p = block->host + (offset & TARGET_PAGE_MASK);
+    uint8_t buf[TARGET_PAGE_SIZE], *p;
 
+    p = block->host + (offset & TARGET_PAGE_MASK);
     bytes_sent = save_page_header(rs, f, block, offset |
                                   RAM_SAVE_FLAG_COMPRESS_PAGE);
-    blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE);
+    memcpy(buf, p, TARGET_PAGE_SIZE);
+    blen = qemu_put_compression_data(f, stream, buf, TARGET_PAGE_SIZE);
     if (blen < 0) {
         bytes_sent = 0;
         qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
@@ -2547,7 +2550,7 @@  static void *do_data_decompress(void *opaque)
     DecompressParam *param = opaque;
     unsigned long pagesize;
     uint8_t *des;
-    int len;
+    int len, ret;
 
     qemu_mutex_lock(&param->mutex);
     while (!param->quit) {
@@ -2563,8 +2566,12 @@  static void *do_data_decompress(void *opaque)
              * not a problem because the dirty page will be retransferred
              * and uncompress() won't break the data in other pages.
              */
-            qemu_uncompress(&param->stream, des, pagesize,
-                            param->compbuf, len);
+            ret = qemu_uncompress(&param->stream, des, pagesize,
+                                  param->compbuf, len);
+            if (ret < 0) {
+                error_report("decompress data failed");
+                qemu_file_set_error(param->file, ret);
+            }
 
             qemu_mutex_lock(&decomp_done_lock);
             param->done = true;
@@ -2581,12 +2588,12 @@  static void *do_data_decompress(void *opaque)
     return NULL;
 }
 
-static void wait_for_decompress_done(void)
+static int wait_for_decompress_done(QEMUFile *f)
 {
     int idx, thread_count;
 
     if (!migrate_use_compression()) {
-        return;
+        return 0;
     }
 
     thread_count = migrate_decompress_threads();
@@ -2597,6 +2604,7 @@  static void wait_for_decompress_done(void)
         }
     }
     qemu_mutex_unlock(&decomp_done_lock);
+    return qemu_file_get_error(f);
 }
 
 static void compress_threads_load_cleanup(void)
@@ -2635,7 +2643,7 @@  static void compress_threads_load_cleanup(void)
     decomp_param = NULL;
 }
 
-static int compress_threads_load_setup(void)
+static int compress_threads_load_setup(QEMUFile *f)
 {
     int i, thread_count;
 
@@ -2654,6 +2662,7 @@  static int compress_threads_load_setup(void)
         }
         decomp_param[i].stream.opaque = &decomp_param[i];
 
+        decomp_param[i].file = f;
         qemu_mutex_init(&decomp_param[i].mutex);
         qemu_cond_init(&decomp_param[i].cond);
         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
@@ -2708,7 +2717,7 @@  static void decompress_data_with_multi_threads(QEMUFile *f,
  */
 static int ram_load_setup(QEMUFile *f, void *opaque)
 {
-    if (compress_threads_load_setup()) {
+    if (compress_threads_load_setup(f)) {
         return -1;
     }
 
@@ -3063,7 +3072,7 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
         }
     }
 
-    wait_for_decompress_done();
+    ret |= wait_for_decompress_done(f);
     rcu_read_unlock();
     trace_ram_load_complete(ret, seq_iter);
     return ret;