diff mbox

[v2,04/10] migration: detect compression and decompression errors

Message ID 20180327091043.30220-5-xiaoguangrong@tencent.com (mailing list archive)
State New, archived
Headers show

Commit Message

Xiao Guangrong March 27, 2018, 9:10 a.m. UTC
From: Xiao Guangrong <xiaoguangrong@tencent.com>

Currently the page being compressed is allowed to be updated by
the VM on the source QEMU, correspondingly the destination QEMU
just ignores the decompression error. However, we completely miss
the chance to catch real errors, then the VM is corrupted silently

To make the migration more robuster, we copy the page to a buffer
first to avoid it being written by VM, then detect and handle the
errors of both compression and decompression errors properly

Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/qemu-file.c |  4 ++--
 migration/ram.c       | 55 +++++++++++++++++++++++++++++++++++----------------
 2 files changed, 40 insertions(+), 19 deletions(-)

Comments

Peter Xu March 28, 2018, 9:59 a.m. UTC | #1
On Tue, Mar 27, 2018 at 05:10:37PM +0800, guangrong.xiao@gmail.com wrote:

[...]

> -static int compress_threads_load_setup(void)
> +static int compress_threads_load_setup(QEMUFile *f)
>  {
>      int i, thread_count;
>  
> @@ -2665,6 +2685,7 @@ static int compress_threads_load_setup(void)
>          }
>          decomp_param[i].stream.opaque = &decomp_param[i];
>  
> +        decomp_param[i].file = f;

On the source side the error will be set via:

        qemu_file_set_error(migrate_get_current()->to_dst_file, blen);

Maybe we can do similar things using migrate_incoming_get_current() to
avoid caching the QEMUFile multiple times?

I think both are not good since qemu_file_set_error() can be called by
multiple threads, but it's only setting a fault value so maybe it's
fine.  Other than that it looks good to me.

Thanks,
Xiao Guangrong March 29, 2018, 3:51 a.m. UTC | #2
On 03/28/2018 05:59 PM, Peter Xu wrote:
> On Tue, Mar 27, 2018 at 05:10:37PM +0800, guangrong.xiao@gmail.com wrote:
> 
> [...]
> 
>> -static int compress_threads_load_setup(void)
>> +static int compress_threads_load_setup(QEMUFile *f)
>>   {
>>       int i, thread_count;
>>   
>> @@ -2665,6 +2685,7 @@ static int compress_threads_load_setup(void)
>>           }
>>           decomp_param[i].stream.opaque = &decomp_param[i];
>>   
>> +        decomp_param[i].file = f;
> 
> On the source side the error will be set via:
> 
>          qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
> 
> Maybe we can do similar things using migrate_incoming_get_current() to
> avoid caching the QEMUFile multiple times?
> 

I have considered it, however, it can not work as the @file used by ram
loader is not the file got from migrate_incoming_get_current() under some
cases.

For example, in colo_process_incoming_thread(), the file passed to
qemu_loadvm_state() is a internal buffer and it is not easy to switch it
to incoming file.
Peter Xu March 29, 2018, 4:25 a.m. UTC | #3
On Thu, Mar 29, 2018 at 11:51:03AM +0800, Xiao Guangrong wrote:
> 
> 
> On 03/28/2018 05:59 PM, Peter Xu wrote:
> > On Tue, Mar 27, 2018 at 05:10:37PM +0800, guangrong.xiao@gmail.com wrote:
> > 
> > [...]
> > 
> > > -static int compress_threads_load_setup(void)
> > > +static int compress_threads_load_setup(QEMUFile *f)
> > >   {
> > >       int i, thread_count;
> > > @@ -2665,6 +2685,7 @@ static int compress_threads_load_setup(void)
> > >           }
> > >           decomp_param[i].stream.opaque = &decomp_param[i];
> > > +        decomp_param[i].file = f;
> > 
> > On the source side the error will be set via:
> > 
> >          qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
> > 
> > Maybe we can do similar things using migrate_incoming_get_current() to
> > avoid caching the QEMUFile multiple times?
> > 
> 
> I have considered it, however, it can not work as the @file used by ram
> loader is not the file got from migrate_incoming_get_current() under some
> cases.
> 
> For example, in colo_process_incoming_thread(), the file passed to
> qemu_loadvm_state() is a internal buffer and it is not easy to switch it
> to incoming file.

I see. How about cache it in a global variable?  We have these
already:

    thread_count = migrate_decompress_threads();
    decompress_threads = g_new0(QemuThread, thread_count);
    decomp_param = g_new0(DecompressParam, thread_count);
    ...

IMHO we can add a new one too, at least we don't cache it multiple
times (after all decomp_param[i]s are global variables too).
Xiao Guangrong March 30, 2018, 3:11 a.m. UTC | #4
On 03/29/2018 12:25 PM, Peter Xu wrote:
> On Thu, Mar 29, 2018 at 11:51:03AM +0800, Xiao Guangrong wrote:
>>
>>
>> On 03/28/2018 05:59 PM, Peter Xu wrote:
>>> On Tue, Mar 27, 2018 at 05:10:37PM +0800, guangrong.xiao@gmail.com wrote:
>>>
>>> [...]
>>>
>>>> -static int compress_threads_load_setup(void)
>>>> +static int compress_threads_load_setup(QEMUFile *f)
>>>>    {
>>>>        int i, thread_count;
>>>> @@ -2665,6 +2685,7 @@ static int compress_threads_load_setup(void)
>>>>            }
>>>>            decomp_param[i].stream.opaque = &decomp_param[i];
>>>> +        decomp_param[i].file = f;
>>>
>>> On the source side the error will be set via:
>>>
>>>           qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
>>>
>>> Maybe we can do similar things using migrate_incoming_get_current() to
>>> avoid caching the QEMUFile multiple times?
>>>
>>
>> I have considered it, however, it can not work as the @file used by ram
>> loader is not the file got from migrate_incoming_get_current() under some
>> cases.
>>
>> For example, in colo_process_incoming_thread(), the file passed to
>> qemu_loadvm_state() is a internal buffer and it is not easy to switch it
>> to incoming file.
> 
> I see. How about cache it in a global variable?  We have these
> already:
> 
>      thread_count = migrate_decompress_threads();
>      decompress_threads = g_new0(QemuThread, thread_count);
>      decomp_param = g_new0(DecompressParam, thread_count);
>      ...
> 
> IMHO we can add a new one too, at least we don't cache it multiple
> times (after all decomp_param[i]s are global variables too).
> 

Nice, that's good to me. Will add your Reviewed-by on this patch
as well if you do not mind. :)
Peter Xu April 2, 2018, 4:26 a.m. UTC | #5
On Fri, Mar 30, 2018 at 11:11:27AM +0800, Xiao Guangrong wrote:
> 
> 
> On 03/29/2018 12:25 PM, Peter Xu wrote:
> > On Thu, Mar 29, 2018 at 11:51:03AM +0800, Xiao Guangrong wrote:
> > > 
> > > 
> > > On 03/28/2018 05:59 PM, Peter Xu wrote:
> > > > On Tue, Mar 27, 2018 at 05:10:37PM +0800, guangrong.xiao@gmail.com wrote:
> > > > 
> > > > [...]
> > > > 
> > > > > -static int compress_threads_load_setup(void)
> > > > > +static int compress_threads_load_setup(QEMUFile *f)
> > > > >    {
> > > > >        int i, thread_count;
> > > > > @@ -2665,6 +2685,7 @@ static int compress_threads_load_setup(void)
> > > > >            }
> > > > >            decomp_param[i].stream.opaque = &decomp_param[i];
> > > > > +        decomp_param[i].file = f;
> > > > 
> > > > On the source side the error will be set via:
> > > > 
> > > >           qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
> > > > 
> > > > Maybe we can do similar things using migrate_incoming_get_current() to
> > > > avoid caching the QEMUFile multiple times?
> > > > 
> > > 
> > > I have considered it, however, it can not work as the @file used by ram
> > > loader is not the file got from migrate_incoming_get_current() under some
> > > cases.
> > > 
> > > For example, in colo_process_incoming_thread(), the file passed to
> > > qemu_loadvm_state() is a internal buffer and it is not easy to switch it
> > > to incoming file.
> > 
> > I see. How about cache it in a global variable?  We have these
> > already:
> > 
> >      thread_count = migrate_decompress_threads();
> >      decompress_threads = g_new0(QemuThread, thread_count);
> >      decomp_param = g_new0(DecompressParam, thread_count);
> >      ...
> > 
> > IMHO we can add a new one too, at least we don't cache it multiple
> > times (after all decomp_param[i]s are global variables too).
> > 
> 
> Nice, that's good to me. Will add your Reviewed-by on this patch
> as well if you do not mind. :)

Yes, please. :)

Thanks,
diff mbox

Patch

diff --git a/migration/qemu-file.c b/migration/qemu-file.c
index e924cc23c5..a7614e8c28 100644
--- a/migration/qemu-file.c
+++ b/migration/qemu-file.c
@@ -710,9 +710,9 @@  ssize_t qemu_put_compression_data(QEMUFile *f, z_stream *stream,
     blen = qemu_compress_data(stream, f->buf + f->buf_index + sizeof(int32_t),
                               blen, p, size);
     if (blen < 0) {
-        error_report("Compress Failed!");
-        return 0;
+        return -1;
     }
+
     qemu_put_be32(f, blen);
     if (f->ops->writev_buffer) {
         add_to_iovec(f, f->buf + f->buf_index, blen, false);
diff --git a/migration/ram.c b/migration/ram.c
index 6b699650ca..e85191c1cb 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -269,7 +269,10 @@  struct CompressParam {
     QemuCond cond;
     RAMBlock *block;
     ram_addr_t offset;
+
+    /* internally used fields */
     z_stream stream;
+    uint8_t *originbuf;
 };
 typedef struct CompressParam CompressParam;
 
@@ -278,6 +281,7 @@  struct DecompressParam {
     bool quit;
     QemuMutex mutex;
     QemuCond cond;
+    QEMUFile *file;
     void *des;
     uint8_t *compbuf;
     int len;
@@ -302,7 +306,7 @@  static QemuMutex decomp_done_lock;
 static QemuCond decomp_done_cond;
 
 static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                ram_addr_t offset);
+                                ram_addr_t offset, uint8_t *source_buf);
 
 static void *do_data_compress(void *opaque)
 {
@@ -318,7 +322,8 @@  static void *do_data_compress(void *opaque)
             param->block = NULL;
             qemu_mutex_unlock(&param->mutex);
 
-            do_compress_ram_page(param->file, &param->stream, block, offset);
+            do_compress_ram_page(param->file, &param->stream, block, offset,
+                                 param->originbuf);
 
             qemu_mutex_lock(&comp_done_lock);
             param->done = true;
@@ -372,6 +377,7 @@  static void compress_threads_save_cleanup(void)
         qemu_mutex_destroy(&comp_param[i].mutex);
         qemu_cond_destroy(&comp_param[i].cond);
         deflateEnd(&comp_param[i].stream);
+        g_free(comp_param[i].originbuf);
         comp_param[i].stream.opaque = NULL;
     }
     qemu_mutex_destroy(&comp_done_lock);
@@ -395,8 +401,14 @@  static int compress_threads_save_setup(void)
     qemu_cond_init(&comp_done_cond);
     qemu_mutex_init(&comp_done_lock);
     for (i = 0; i < thread_count; i++) {
+        comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+        if (!comp_param[i].originbuf) {
+            goto exit;
+        }
+
         if (deflateInit(&comp_param[i].stream,
                            migrate_compress_level()) != Z_OK) {
+            g_free(comp_param[i].originbuf);
             goto exit;
         }
         comp_param[i].stream.opaque = &comp_param[i];
@@ -1055,7 +1067,7 @@  static int ram_save_page(RAMState *rs, PageSearchStatus *pss, bool last_stage)
 }
 
 static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
-                                ram_addr_t offset)
+                                ram_addr_t offset, uint8_t *source_buf)
 {
     RAMState *rs = ram_state;
     int bytes_sent, blen;
@@ -1063,7 +1075,14 @@  static int do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
 
     bytes_sent = save_page_header(rs, f, block, offset |
                                   RAM_SAVE_FLAG_COMPRESS_PAGE);
-    blen = qemu_put_compression_data(f, stream, p, TARGET_PAGE_SIZE);
+
+    /*
+     * copy it to a internal buffer to avoid it being modified by VM
+     * so that we can catch up the error during compression and
+     * decompression
+     */
+    memcpy(source_buf, p, TARGET_PAGE_SIZE);
+    blen = qemu_put_compression_data(f, stream, source_buf, TARGET_PAGE_SIZE);
     if (blen < 0) {
         bytes_sent = 0;
         qemu_file_set_error(migrate_get_current()->to_dst_file, blen);
@@ -2557,7 +2576,7 @@  static void *do_data_decompress(void *opaque)
     DecompressParam *param = opaque;
     unsigned long pagesize;
     uint8_t *des;
-    int len;
+    int len, ret;
 
     qemu_mutex_lock(&param->mutex);
     while (!param->quit) {
@@ -2568,13 +2587,13 @@  static void *do_data_decompress(void *opaque)
             qemu_mutex_unlock(&param->mutex);
 
             pagesize = TARGET_PAGE_SIZE;
-            /* qemu_uncompress_data() will return failed in some case,
-             * especially when the page is dirtied when doing the compression,
-             * it's not a problem because the dirty page will be retransferred
-             * and uncompress() won't break the data in other pages.
-             */
-            qemu_uncompress_data(&param->stream, des, pagesize, param->compbuf,
-                                 len);
+
+            ret = qemu_uncompress_data(&param->stream, des, pagesize,
+                                       param->compbuf, len);
+            if (ret < 0) {
+                error_report("decompress data failed");
+                qemu_file_set_error(param->file, ret);
+            }
 
             qemu_mutex_lock(&decomp_done_lock);
             param->done = true;
@@ -2591,12 +2610,12 @@  static void *do_data_decompress(void *opaque)
     return NULL;
 }
 
-static void wait_for_decompress_done(void)
+static int wait_for_decompress_done(QEMUFile *f)
 {
     int idx, thread_count;
 
     if (!migrate_use_compression()) {
-        return;
+        return 0;
     }
 
     thread_count = migrate_decompress_threads();
@@ -2607,6 +2626,7 @@  static void wait_for_decompress_done(void)
         }
     }
     qemu_mutex_unlock(&decomp_done_lock);
+    return qemu_file_get_error(f);
 }
 
 static void compress_threads_load_cleanup(void)
@@ -2646,7 +2666,7 @@  static void compress_threads_load_cleanup(void)
     decomp_param = NULL;
 }
 
-static int compress_threads_load_setup(void)
+static int compress_threads_load_setup(QEMUFile *f)
 {
     int i, thread_count;
 
@@ -2665,6 +2685,7 @@  static int compress_threads_load_setup(void)
         }
         decomp_param[i].stream.opaque = &decomp_param[i];
 
+        decomp_param[i].file = f;
         qemu_mutex_init(&decomp_param[i].mutex);
         qemu_cond_init(&decomp_param[i].cond);
         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
@@ -2719,7 +2740,7 @@  static void decompress_data_with_multi_threads(QEMUFile *f,
  */
 static int ram_load_setup(QEMUFile *f, void *opaque)
 {
-    if (compress_threads_load_setup()) {
+    if (compress_threads_load_setup(f)) {
         return -1;
     }
 
@@ -3074,7 +3095,7 @@  static int ram_load(QEMUFile *f, void *opaque, int version_id)
         }
     }
 
-    wait_for_decompress_done();
+    ret |= wait_for_decompress_done(f);
     rcu_read_unlock();
     trace_ram_load_complete(ret, seq_iter);
     return ret;