diff mbox

migration: Fix multi-thread compression bug

Message ID 1462257521-16075-1-git-send-email-liang.z.li@intel.com (mailing list archive)
State New, archived
Headers show

Commit Message

Liang Li May 3, 2016, 6:38 a.m. UTC
Recently, a bug related to multiple thread compression feature for
live migration is reported. The destination side will be blocked
during live migration if there are heavy workload in host and
memory intensive workload in guest, this is most likely to happen
when there is one decompression thread.

Some parts of the decompression code are incorrect:
1. The main thread receives data from source side will enter a busy
loop to wait for a free decompression thread.
2. A lock is needed to protect the decomp_param[idx]->start, because
it is checked in the main thread and is updated in the decompression
thread.

Fix these two issues by following the code pattern for compression.

Reported-by: Daniel P. Berrange <berrange@redhat.com>
Signed-off-by: Liang Li <liang.z.li@intel.com>
---
 migration/ram.c | 38 +++++++++++++++++++++++++++-----------
 1 file changed, 27 insertions(+), 11 deletions(-)

Comments

Liang Li May 3, 2016, 6:43 a.m. UTC | #1
Cc: berrange@redhat.com

Liang

> -----Original Message-----
> From: Li, Liang Z
> Sent: Tuesday, May 03, 2016 2:39 PM
> To: qemu-devel@nongnu.org
> Cc: quintela@redhat.com; amit.shah@redhat.com; dgilbert@redhat.com; Li,
> Liang Z
> Subject: [PATCH] migration: Fix multi-thread compression bug
> 
> Recently, a bug related to multiple thread compression feature for live
> migration is reported. The destination side will be blocked during live
> migration if there are heavy workload in host and memory intensive
> workload in guest, this is most likely to happen when there is one
> decompression thread.
> 
> Some parts of the decompression code are incorrect:
> 1. The main thread receives data from source side will enter a busy loop to
> wait for a free decompression thread.
> 2. A lock is needed to protect the decomp_param[idx]->start, because it is
> checked in the main thread and is updated in the decompression thread.
> 
> Fix these two issues by following the code pattern for compression.
> 
> Reported-by: Daniel P. Berrange <berrange@redhat.com>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> ---
>  migration/ram.c | 38 +++++++++++++++++++++++++++-----------
>  1 file changed, 27 insertions(+), 11 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c index 3f05738..7ab6ab5 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -263,6 +263,7 @@ typedef struct CompressParam CompressParam;
> 
>  struct DecompressParam {
>      bool start;
> +    bool done;
>      QemuMutex mutex;
>      QemuCond cond;
>      void *des;
> @@ -287,6 +288,8 @@ static bool quit_comp_thread;  static bool
> quit_decomp_thread;  static DecompressParam *decomp_param;  static
> QemuThread *decompress_threads;
> +static QemuMutex decomp_done_lock;
> +static QemuCond decomp_done_cond;
> 
>  static int do_compress_ram_page(CompressParam *param);
> 
> @@ -834,6 +837,7 @@ static inline void start_compression(CompressParam
> *param)
> 
>  static inline void start_decompression(DecompressParam *param)  {
> +    param->done = false;
>      qemu_mutex_lock(&param->mutex);
>      param->start = true;
>      qemu_cond_signal(&param->cond);
> @@ -2193,19 +2197,24 @@ static void *do_data_decompress(void *opaque)
>          qemu_mutex_lock(&param->mutex);
>          while (!param->start && !quit_decomp_thread) {
>              qemu_cond_wait(&param->cond, &param->mutex);
> +        }
> +        if (!quit_decomp_thread) {
>              pagesize = TARGET_PAGE_SIZE;
> -            if (!quit_decomp_thread) {
> -                /* uncompress() will return failed in some case, especially
> -                 * when the page is dirted when doing the compression, it's
> -                 * not a problem because the dirty page will be retransferred
> -                 * and uncompress() won't break the data in other pages.
> -                 */
> -                uncompress((Bytef *)param->des, &pagesize,
> -                           (const Bytef *)param->compbuf, param->len);
> -            }
> -            param->start = false;
> +            /* uncompress() will return failed in some case, especially
> +             * when the page is dirted when doing the compression, it's
> +             * not a problem because the dirty page will be retransferred
> +             * and uncompress() won't break the data in other pages.
> +             */
> +            uncompress((Bytef *)param->des, &pagesize,
> +                       (const Bytef *)param->compbuf, param->len);
>          }
> +        param->start = false;
>          qemu_mutex_unlock(&param->mutex);
> +
> +        qemu_mutex_lock(&decomp_done_lock);
> +        param->done = true;
> +        qemu_cond_signal(&decomp_done_cond);
> +        qemu_mutex_unlock(&decomp_done_lock);
>      }
> 
>      return NULL;
> @@ -2219,10 +2228,13 @@ void migrate_decompress_threads_create(void)
>      decompress_threads = g_new0(QemuThread, thread_count);
>      decomp_param = g_new0(DecompressParam, thread_count);
>      quit_decomp_thread = false;
> +    qemu_mutex_init(&decomp_done_lock);
> +    qemu_cond_init(&decomp_done_cond);
>      for (i = 0; i < thread_count; i++) {
>          qemu_mutex_init(&decomp_param[i].mutex);
>          qemu_cond_init(&decomp_param[i].cond);
>          decomp_param[i].compbuf =
> g_malloc0(compressBound(TARGET_PAGE_SIZE));
> +        decomp_param[i].done = true;
>          qemu_thread_create(decompress_threads + i, "decompress",
>                             do_data_decompress, decomp_param + i,
>                             QEMU_THREAD_JOINABLE); @@ -2258,9 +2270,10 @@ static
> void decompress_data_with_multi_threads(QEMUFile *f,
>      int idx, thread_count;
> 
>      thread_count = migrate_decompress_threads();
> +    qemu_mutex_lock(&decomp_done_lock);
>      while (true) {
>          for (idx = 0; idx < thread_count; idx++) {
> -            if (!decomp_param[idx].start) {
> +            if (decomp_param[idx].done) {
>                  qemu_get_buffer(f, decomp_param[idx].compbuf, len);
>                  decomp_param[idx].des = host;
>                  decomp_param[idx].len = len; @@ -2270,8 +2283,11 @@ static void
> decompress_data_with_multi_threads(QEMUFile *f,
>          }
>          if (idx < thread_count) {
>              break;
> +        } else {
> +            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
>          }
>      }
> +    qemu_mutex_unlock(&decomp_done_lock);
>  }
> 
>  /*
> --
> 1.9.1
Daniel P. Berrangé May 3, 2016, 10:44 a.m. UTC | #2
On Tue, May 03, 2016 at 02:38:41PM +0800, Liang Li wrote:
> Recently, a bug related to multiple thread compression feature for
> live migration is reported. The destination side will be blocked
> during live migration if there are heavy workload in host and
> memory intensive workload in guest, this is most likely to happen
> when there is one decompression thread.
> 
> Some parts of the decompression code are incorrect:
> 1. The main thread receives data from source side will enter a busy
> loop to wait for a free decompression thread.
> 2. A lock is needed to protect the decomp_param[idx]->start, because
> it is checked in the main thread and is updated in the decompression
> thread.
> 
> Fix these two issues by following the code pattern for compression.
> 
> Reported-by: Daniel P. Berrange <berrange@redhat.com>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> ---
>  migration/ram.c | 38 +++++++++++++++++++++++++++-----------
>  1 file changed, 27 insertions(+), 11 deletions(-)

Tested-by: Daniel P. Berrange <berrange@redhat.com>
Reviewed-by: Daniel P. Berrange <berrange@redhat.com>


Regards,
Daniel
Dr. David Alan Gilbert May 3, 2016, 4:39 p.m. UTC | #3
* Liang Li (liang.z.li@intel.com) wrote:
> Recently, a bug related to multiple thread compression feature for
> live migration is reported. The destination side will be blocked
> during live migration if there are heavy workload in host and
> memory intensive workload in guest, this is most likely to happen
> when there is one decompression thread.
> 
> Some parts of the decompression code are incorrect:
> 1. The main thread receives data from source side will enter a busy
> loop to wait for a free decompression thread.
> 2. A lock is needed to protect the decomp_param[idx]->start, because
> it is checked in the main thread and is updated in the decompression
> thread.
> 
> Fix these two issues by following the code pattern for compression.
> 

OK, I think that's an improvement - but I have a question.
Since it's an improvement (and basically now the same as compress):

Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

however, my question is:
What guarantee's that all of the decompression has finished by the time
the VM is started?  I see in migration/migration.c that we have:

    if (!global_state_received() ||
        global_state_get_runstate() == RUN_STATE_RUNNING) {
        if (autostart) {
            vm_start();
        } else {
            runstate_set(RUN_STATE_PAUSED);
        }
    } else {
        runstate_set(global_state_get_runstate());
    }
    migrate_decompress_threads_join();

so I guess that join ensures we have decompressed everything - but
that needs to happen before we do the vm_start, not after - actually
we need to make sure the decompress of RAM has happened before we
start loading any of the other devices (since they may read from RAM).

So, do we need something at the end of ram_load() ?

Dave

> Reported-by: Daniel P. Berrange <berrange@redhat.com>
> Signed-off-by: Liang Li <liang.z.li@intel.com>
> ---
>  migration/ram.c | 38 +++++++++++++++++++++++++++-----------
>  1 file changed, 27 insertions(+), 11 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 3f05738..7ab6ab5 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -263,6 +263,7 @@ typedef struct CompressParam CompressParam;
>  
>  struct DecompressParam {
>      bool start;
> +    bool done;
>      QemuMutex mutex;
>      QemuCond cond;
>      void *des;
> @@ -287,6 +288,8 @@ static bool quit_comp_thread;
>  static bool quit_decomp_thread;
>  static DecompressParam *decomp_param;
>  static QemuThread *decompress_threads;
> +static QemuMutex decomp_done_lock;
> +static QemuCond decomp_done_cond;
>  
>  static int do_compress_ram_page(CompressParam *param);
>  
> @@ -834,6 +837,7 @@ static inline void start_compression(CompressParam *param)
>  
>  static inline void start_decompression(DecompressParam *param)
>  {
> +    param->done = false;
>      qemu_mutex_lock(&param->mutex);
>      param->start = true;
>      qemu_cond_signal(&param->cond);
> @@ -2193,19 +2197,24 @@ static void *do_data_decompress(void *opaque)
>          qemu_mutex_lock(&param->mutex);
>          while (!param->start && !quit_decomp_thread) {
>              qemu_cond_wait(&param->cond, &param->mutex);
> +        }
> +        if (!quit_decomp_thread) {
>              pagesize = TARGET_PAGE_SIZE;
> -            if (!quit_decomp_thread) {
> -                /* uncompress() will return failed in some case, especially
> -                 * when the page is dirted when doing the compression, it's
> -                 * not a problem because the dirty page will be retransferred
> -                 * and uncompress() won't break the data in other pages.
> -                 */
> -                uncompress((Bytef *)param->des, &pagesize,
> -                           (const Bytef *)param->compbuf, param->len);
> -            }
> -            param->start = false;
> +            /* uncompress() will return failed in some case, especially
> +             * when the page is dirted when doing the compression, it's
> +             * not a problem because the dirty page will be retransferred
> +             * and uncompress() won't break the data in other pages.
> +             */
> +            uncompress((Bytef *)param->des, &pagesize,
> +                       (const Bytef *)param->compbuf, param->len);
>          }
> +        param->start = false;
>          qemu_mutex_unlock(&param->mutex);
> +
> +        qemu_mutex_lock(&decomp_done_lock);
> +        param->done = true;
> +        qemu_cond_signal(&decomp_done_cond);
> +        qemu_mutex_unlock(&decomp_done_lock);
>      }
>  
>      return NULL;
> @@ -2219,10 +2228,13 @@ void migrate_decompress_threads_create(void)
>      decompress_threads = g_new0(QemuThread, thread_count);
>      decomp_param = g_new0(DecompressParam, thread_count);
>      quit_decomp_thread = false;
> +    qemu_mutex_init(&decomp_done_lock);
> +    qemu_cond_init(&decomp_done_cond);
>      for (i = 0; i < thread_count; i++) {
>          qemu_mutex_init(&decomp_param[i].mutex);
>          qemu_cond_init(&decomp_param[i].cond);
>          decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
> +        decomp_param[i].done = true;
>          qemu_thread_create(decompress_threads + i, "decompress",
>                             do_data_decompress, decomp_param + i,
>                             QEMU_THREAD_JOINABLE);
> @@ -2258,9 +2270,10 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
>      int idx, thread_count;
>  
>      thread_count = migrate_decompress_threads();
> +    qemu_mutex_lock(&decomp_done_lock);
>      while (true) {
>          for (idx = 0; idx < thread_count; idx++) {
> -            if (!decomp_param[idx].start) {
> +            if (decomp_param[idx].done) {
>                  qemu_get_buffer(f, decomp_param[idx].compbuf, len);
>                  decomp_param[idx].des = host;
>                  decomp_param[idx].len = len;
> @@ -2270,8 +2283,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
>          }
>          if (idx < thread_count) {
>              break;
> +        } else {
> +            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
>          }
>      }
> +    qemu_mutex_unlock(&decomp_done_lock);
>  }
>  
>  /*
> -- 
> 1.9.1
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Liang Li May 4, 2016, 1:10 a.m. UTC | #4
> * Liang Li (liang.z.li@intel.com) wrote:
> > Recently, a bug related to multiple thread compression feature for
> > live migration is reported. The destination side will be blocked
> > during live migration if there are heavy workload in host and memory
> > intensive workload in guest, this is most likely to happen when there
> > is one decompression thread.
> >
> > Some parts of the decompression code are incorrect:
> > 1. The main thread receives data from source side will enter a busy
> > loop to wait for a free decompression thread.
> > 2. A lock is needed to protect the decomp_param[idx]->start, because
> > it is checked in the main thread and is updated in the decompression
> > thread.
> >
> > Fix these two issues by following the code pattern for compression.
> >
> 
> OK, I think that's an improvement - but I have a question.
> Since it's an improvement (and basically now the same as compress):
> 
> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
> 
> however, my question is:
> What guarantee's that all of the decompression has finished by the time the
> VM is started?  I see in migration/migration.c that we have:
> 
>     if (!global_state_received() ||
>         global_state_get_runstate() == RUN_STATE_RUNNING) {
>         if (autostart) {
>             vm_start();
>         } else {
>             runstate_set(RUN_STATE_PAUSED);
>         }
>     } else {
>         runstate_set(global_state_get_runstate());
>     }
>     migrate_decompress_threads_join();
> 
> so I guess that join ensures we have decompressed everything - but that
> needs to happen before we do the vm_start, not after - actually we need to
> make sure the decompress of RAM has happened before we start loading
> any of the other devices (since they may read from RAM).
> 
> So, do we need something at the end of ram_load() ?
> 

Yes, We should make sure all the decompression has been finished before vm_start().

I really did that long ago in the POC code, I couldn't remember why  the related code was 
missed in the upstream code. I will submit another patch for this.

Thanks!

Liang

> Dave
> 


> > Reported-by: Daniel P. Berrange <berrange@redhat.com>
> > Signed-off-by: Liang Li <liang.z.li@intel.com>
> > ---
> >  migration/ram.c | 38 +++++++++++++++++++++++++++-----------
> >  1 file changed, 27 insertions(+), 11 deletions(-)
> >
> > diff --git a/migration/ram.c b/migration/ram.c index 3f05738..7ab6ab5
> > 100644
> > --- a/migration/ram.c
> > +++ b/migration/ram.c
> > @@ -263,6 +263,7 @@ typedef struct CompressParam CompressParam;
> >
> >  struct DecompressParam {
> >      bool start;
> > +    bool done;
> >      QemuMutex mutex;
> >      QemuCond cond;
> >      void *des;
> > @@ -287,6 +288,8 @@ static bool quit_comp_thread;  static bool
> > quit_decomp_thread;  static DecompressParam *decomp_param;  static
> > QemuThread *decompress_threads;
> > +static QemuMutex decomp_done_lock;
> > +static QemuCond decomp_done_cond;
> >
> >  static int do_compress_ram_page(CompressParam *param);
> >
> > @@ -834,6 +837,7 @@ static inline void
> start_compression(CompressParam
> > *param)
> >
> >  static inline void start_decompression(DecompressParam *param)  {
> > +    param->done = false;
> >      qemu_mutex_lock(&param->mutex);
> >      param->start = true;
> >      qemu_cond_signal(&param->cond);
> > @@ -2193,19 +2197,24 @@ static void *do_data_decompress(void
> *opaque)
> >          qemu_mutex_lock(&param->mutex);
> >          while (!param->start && !quit_decomp_thread) {
> >              qemu_cond_wait(&param->cond, &param->mutex);
> > +        }
> > +        if (!quit_decomp_thread) {
> >              pagesize = TARGET_PAGE_SIZE;
> > -            if (!quit_decomp_thread) {
> > -                /* uncompress() will return failed in some case, especially
> > -                 * when the page is dirted when doing the compression, it's
> > -                 * not a problem because the dirty page will be retransferred
> > -                 * and uncompress() won't break the data in other pages.
> > -                 */
> > -                uncompress((Bytef *)param->des, &pagesize,
> > -                           (const Bytef *)param->compbuf, param->len);
> > -            }
> > -            param->start = false;
> > +            /* uncompress() will return failed in some case, especially
> > +             * when the page is dirted when doing the compression, it's
> > +             * not a problem because the dirty page will be retransferred
> > +             * and uncompress() won't break the data in other pages.
> > +             */
> > +            uncompress((Bytef *)param->des, &pagesize,
> > +                       (const Bytef *)param->compbuf, param->len);
> >          }
> > +        param->start = false;
> >          qemu_mutex_unlock(&param->mutex);
> > +
> > +        qemu_mutex_lock(&decomp_done_lock);
> > +        param->done = true;
> > +        qemu_cond_signal(&decomp_done_cond);
> > +        qemu_mutex_unlock(&decomp_done_lock);
> >      }
> >
> >      return NULL;
> > @@ -2219,10 +2228,13 @@ void
> migrate_decompress_threads_create(void)
> >      decompress_threads = g_new0(QemuThread, thread_count);
> >      decomp_param = g_new0(DecompressParam, thread_count);
> >      quit_decomp_thread = false;
> > +    qemu_mutex_init(&decomp_done_lock);
> > +    qemu_cond_init(&decomp_done_cond);
> >      for (i = 0; i < thread_count; i++) {
> >          qemu_mutex_init(&decomp_param[i].mutex);
> >          qemu_cond_init(&decomp_param[i].cond);
> >          decomp_param[i].compbuf =
> > g_malloc0(compressBound(TARGET_PAGE_SIZE));
> > +        decomp_param[i].done = true;
> >          qemu_thread_create(decompress_threads + i, "decompress",
> >                             do_data_decompress, decomp_param + i,
> >                             QEMU_THREAD_JOINABLE); @@ -2258,9 +2270,10
> > @@ static void decompress_data_with_multi_threads(QEMUFile *f,
> >      int idx, thread_count;
> >
> >      thread_count = migrate_decompress_threads();
> > +    qemu_mutex_lock(&decomp_done_lock);
> >      while (true) {
> >          for (idx = 0; idx < thread_count; idx++) {
> > -            if (!decomp_param[idx].start) {
> > +            if (decomp_param[idx].done) {
> >                  qemu_get_buffer(f, decomp_param[idx].compbuf, len);
> >                  decomp_param[idx].des = host;
> >                  decomp_param[idx].len = len; @@ -2270,8 +2283,11 @@
> > static void decompress_data_with_multi_threads(QEMUFile *f,
> >          }
> >          if (idx < thread_count) {
> >              break;
> > +        } else {
> > +            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
> >          }
> >      }
> > +    qemu_mutex_unlock(&decomp_done_lock);
> >  }
> >
> >  /*
> > --
> > 1.9.1
> >
> --
> Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Juan Quintela May 4, 2016, 9:03 a.m. UTC | #5
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Liang Li (liang.z.li@intel.com) wrote:
>> Recently, a bug related to multiple thread compression feature for
>> live migration is reported. The destination side will be blocked
>> during live migration if there are heavy workload in host and
>> memory intensive workload in guest, this is most likely to happen
>> when there is one decompression thread.
>> 
>> Some parts of the decompression code are incorrect:
>> 1. The main thread receives data from source side will enter a busy
>> loop to wait for a free decompression thread.
>> 2. A lock is needed to protect the decomp_param[idx]->start, because
>> it is checked in the main thread and is updated in the decompression
>> thread.
>> 
>> Fix these two issues by following the code pattern for compression.
>> 
>
> OK, I think that's an improvement - but I have a question.
> Since it's an improvement (and basically now the same as compress):
>
> Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
>
> however, my question is:
> What guarantee's that all of the decompression has finished by the time
> the VM is started?  I see in migration/migration.c that we have:
>
>     if (!global_state_received() ||
>         global_state_get_runstate() == RUN_STATE_RUNNING) {
>         if (autostart) {
>             vm_start();
>         } else {
>             runstate_set(RUN_STATE_PAUSED);
>         }
>     } else {
>         runstate_set(global_state_get_runstate());
>     }
>     migrate_decompress_threads_join();

You are right here.  If we don't want to do the join earlier, we need to
do the equivalent of looking that all threads have set param->done as true.
That is what I do on the multiple-fd code.

Later, Juan.
Juan Quintela May 4, 2016, 9:11 a.m. UTC | #6
Liang Li <liang.z.li@intel.com> wrote:
> Recently, a bug related to multiple thread compression feature for
> live migration is reported. The destination side will be blocked
> during live migration if there are heavy workload in host and
> memory intensive workload in guest, this is most likely to happen
> when there is one decompression thread.
>
> Some parts of the decompression code are incorrect:
> 1. The main thread receives data from source side will enter a busy
> loop to wait for a free decompression thread.
> 2. A lock is needed to protect the decomp_param[idx]->start, because
> it is checked in the main thread and is updated in the decompression
> thread.
>
> Fix these two issues by following the code pattern for compression.
>
> Reported-by: Daniel P. Berrange <berrange@redhat.com>
> Signed-off-by: Liang Li <liang.z.li@intel.com>

step in the right direction, so:
Reviewed-by: Juan Quintela <quintela@redhat.com>

but I am still not sure that this is
enough.  if you have the change, look at the multiple-fd code that I
posted, is very similar here.


>  struct DecompressParam {

what protect start, and what protect done?


>      bool start;
> +    bool done;
>      QemuMutex mutex;
>      QemuCond cond;
>      void *des;
> @@ -287,6 +288,8 @@ static bool quit_comp_thread;
>  static bool quit_decomp_thread;
>  static DecompressParam *decomp_param;
>  static QemuThread *decompress_threads;
> +static QemuMutex decomp_done_lock;
> +static QemuCond decomp_done_cond;
>  
>  static int do_compress_ram_page(CompressParam *param);
>  
> @@ -834,6 +837,7 @@ static inline void start_compression(CompressParam *param)
>  
>  static inline void start_decompression(DecompressParam *param)
>  {

Here nothing protects done

> +    param->done = false;
>      qemu_mutex_lock(&param->mutex);
>      param->start = true;
>      qemu_cond_signal(&param->cond);
> @@ -2193,19 +2197,24 @@ static void *do_data_decompress(void *opaque)
>          qemu_mutex_lock(&param->mutex);

we are looking at quit_decomp_thread and nothing protects it


>          while (!param->start && !quit_decomp_thread) {
>              qemu_cond_wait(&param->cond, &param->mutex);
> +        }
> +        if (!quit_decomp_thread) {
>              pagesize = TARGET_PAGE_SIZE;
> -            if (!quit_decomp_thread) {
> -                /* uncompress() will return failed in some case, especially
> -                 * when the page is dirted when doing the compression, it's
> -                 * not a problem because the dirty page will be retransferred
> -                 * and uncompress() won't break the data in other pages.
> -                 */
> -                uncompress((Bytef *)param->des, &pagesize,
> -                           (const Bytef *)param->compbuf, param->len);
> -            }
> -            param->start = false;
> +            /* uncompress() will return failed in some case, especially
> +             * when the page is dirted when doing the compression, it's
> +             * not a problem because the dirty page will be retransferred
> +             * and uncompress() won't break the data in other pages.
> +             */
> +            uncompress((Bytef *)param->des, &pagesize,
> +                       (const Bytef *)param->compbuf, param->len);

We are calling uncompress (a slow operation) with param->mutex taken, is
there any reason why we can't just put the param->* vars in locals?

>          }
> +        param->start = false;

Why are we setting start to false when we _are_ not decompressing a
page?  I think this line should be inside the loop.

>          qemu_mutex_unlock(&param->mutex);
> +
> +        qemu_mutex_lock(&decomp_done_lock);
> +        param->done = true;

here param->done is protected by decomp_done_lock.

> +        qemu_cond_signal(&decomp_done_cond);
> +        qemu_mutex_unlock(&decomp_done_lock);
>      }
>  
>      return NULL;
> @@ -2219,10 +2228,13 @@ void migrate_decompress_threads_create(void)
>      decompress_threads = g_new0(QemuThread, thread_count);
>      decomp_param = g_new0(DecompressParam, thread_count);
>      quit_decomp_thread = false;
> +    qemu_mutex_init(&decomp_done_lock);
> +    qemu_cond_init(&decomp_done_cond);
>      for (i = 0; i < thread_count; i++) {
>          qemu_mutex_init(&decomp_param[i].mutex);
>          qemu_cond_init(&decomp_param[i].cond);
>          decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
> +        decomp_param[i].done = true;
>          qemu_thread_create(decompress_threads + i, "decompress",
>                             do_data_decompress, decomp_param + i,
>                             QEMU_THREAD_JOINABLE);
> @@ -2258,9 +2270,10 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
>      int idx, thread_count;
>  
>      thread_count = migrate_decompress_threads();
> +    qemu_mutex_lock(&decomp_done_lock);

we took decomp_done_lock

>      while (true) {
>          for (idx = 0; idx < thread_count; idx++) {
> -            if (!decomp_param[idx].start) {
> +            if (decomp_param[idx].done) {

and we can protecet done with it.

>                  qemu_get_buffer(f, decomp_param[idx].compbuf, len);
>                  decomp_param[idx].des = host;
>                  decomp_param[idx].len = len;

but this ones should be proteced by docomp_param[idx].mutex, no?

> @@ -2270,8 +2283,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
>          }
>          if (idx < thread_count) {
>              break;
> +        } else {
> +            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
>          }
>      }
> +    qemu_mutex_unlock(&decomp_done_lock);
>  }
>  
>  /*

Thanks, Juan.
Liang Li May 4, 2016, 10:03 a.m. UTC | #7
> To: Li, Liang Z
> Cc: amit.shah@redhat.com; qemu-devel@nongnu.org; dgilbert@redhat.com
> Subject: Re: [Qemu-devel] [PATCH] migration: Fix multi-thread compression
> bug
> 
> Liang Li <liang.z.li@intel.com> wrote:
> > Recently, a bug related to multiple thread compression feature for
> > live migration is reported. The destination side will be blocked
> > during live migration if there are heavy workload in host and memory
> > intensive workload in guest, this is most likely to happen when there
> > is one decompression thread.
> >
> > Some parts of the decompression code are incorrect:
> > 1. The main thread receives data from source side will enter a busy
> > loop to wait for a free decompression thread.
> > 2. A lock is needed to protect the decomp_param[idx]->start, because
> > it is checked in the main thread and is updated in the decompression
> > thread.
> >
> > Fix these two issues by following the code pattern for compression.
> >
> > Reported-by: Daniel P. Berrange <berrange@redhat.com>
> > Signed-off-by: Liang Li <liang.z.li@intel.com>
> 
> step in the right direction, so:
> Reviewed-by: Juan Quintela <quintela@redhat.com>
> 
> but I am still not sure that this is
> enough.  if you have the change, look at the multiple-fd code that I posted, is
> very similar here.
> 
> 
> >  struct DecompressParam {
> 
> what protect start, and what protect done?
> 

decomp_param[i]-> mutex protects start and  decomp_done_lock
protects done.

> 
> >      bool start;
> > +    bool done;
> >      QemuMutex mutex;
> >      QemuCond cond;
> >      void *des;
> > @@ -287,6 +288,8 @@ static bool quit_comp_thread;  static bool
> > quit_decomp_thread;  static DecompressParam *decomp_param;  static
> > QemuThread *decompress_threads;
> > +static QemuMutex decomp_done_lock;
> > +static QemuCond decomp_done_cond;
> >
> >  static int do_compress_ram_page(CompressParam *param);
> >
> > @@ -834,6 +837,7 @@ static inline void
> start_compression(CompressParam
> > *param)
> >
> >  static inline void start_decompression(DecompressParam *param)  {
> 
> Here nothing protects done

start_decompression is call when holding the decomp_done_lock.

> 
> > +    param->done = false;
> >      qemu_mutex_lock(&param->mutex);
> >      param->start = true;
> >      qemu_cond_signal(&param->cond);
> > @@ -2193,19 +2197,24 @@ static void *do_data_decompress(void
> *opaque)
> >          qemu_mutex_lock(&param->mutex);
> 
> we are looking at quit_decomp_thread and nothing protects it
> 
> 
> >          while (!param->start && !quit_decomp_thread) {
> >              qemu_cond_wait(&param->cond, &param->mutex);
> > +        }
> > +        if (!quit_decomp_thread) {
> >              pagesize = TARGET_PAGE_SIZE;
> > -            if (!quit_decomp_thread) {
> > -                /* uncompress() will return failed in some case, especially
> > -                 * when the page is dirted when doing the compression, it's
> > -                 * not a problem because the dirty page will be retransferred
> > -                 * and uncompress() won't break the data in other pages.
> > -                 */
> > -                uncompress((Bytef *)param->des, &pagesize,
> > -                           (const Bytef *)param->compbuf, param->len);
> > -            }
> > -            param->start = false;
> > +            /* uncompress() will return failed in some case, especially
> > +             * when the page is dirted when doing the compression, it's
> > +             * not a problem because the dirty page will be retransferred
> > +             * and uncompress() won't break the data in other pages.
> > +             */
> > +            uncompress((Bytef *)param->des, &pagesize,
> > +                       (const Bytef *)param->compbuf, param->len);
> 
> We are calling uncompress (a slow operation) with param->mutex taken, is
> there any reason why we can't just put the param->* vars in locals?
> 
> >          }
> > +        param->start = false;
> 
> Why are we setting start to false when we _are_ not decompressing a page?
> I think this line should be inside the loop.
> 
> >          qemu_mutex_unlock(&param->mutex);
> > +
> > +        qemu_mutex_lock(&decomp_done_lock);
> > +        param->done = true;
> 
> here param->done is protected by decomp_done_lock.
> 
> > +        qemu_cond_signal(&decomp_done_cond);
> > +        qemu_mutex_unlock(&decomp_done_lock);
> >      }
> >
> >      return NULL;
> > @@ -2219,10 +2228,13 @@ void
> migrate_decompress_threads_create(void)
> >      decompress_threads = g_new0(QemuThread, thread_count);
> >      decomp_param = g_new0(DecompressParam, thread_count);
> >      quit_decomp_thread = false;
> > +    qemu_mutex_init(&decomp_done_lock);
> > +    qemu_cond_init(&decomp_done_cond);
> >      for (i = 0; i < thread_count; i++) {
> >          qemu_mutex_init(&decomp_param[i].mutex);
> >          qemu_cond_init(&decomp_param[i].cond);
> >          decomp_param[i].compbuf =
> > g_malloc0(compressBound(TARGET_PAGE_SIZE));
> > +        decomp_param[i].done = true;
> >          qemu_thread_create(decompress_threads + i, "decompress",
> >                             do_data_decompress, decomp_param + i,
> >                             QEMU_THREAD_JOINABLE); @@ -2258,9 +2270,10
> > @@ static void decompress_data_with_multi_threads(QEMUFile *f,
> >      int idx, thread_count;
> >
> >      thread_count = migrate_decompress_threads();
> > +    qemu_mutex_lock(&decomp_done_lock);
> 
> we took decomp_done_lock
> 
> >      while (true) {
> >          for (idx = 0; idx < thread_count; idx++) {
> > -            if (!decomp_param[idx].start) {
> > +            if (decomp_param[idx].done) {
> 
> and we can protecet done with it.
> 
> >                  qemu_get_buffer(f, decomp_param[idx].compbuf, len);
> >                  decomp_param[idx].des = host;
> >                  decomp_param[idx].len = len;
> 
> but this ones should be proteced by docomp_param[idx].mutex, no?

The code can work correct, but it looks confusion, it seems I should make the lock more clear.
I will try to change it by referencing you multi -fd code. Thanks!


Liang

> 
> > @@ -2270,8 +2283,11 @@ static void
> decompress_data_with_multi_threads(QEMUFile *f,
> >          }
> >          if (idx < thread_count) {
> >              break;
> > +        } else {
> > +            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
> >          }
> >      }
> > +    qemu_mutex_unlock(&decomp_done_lock);
> >  }
> >
> >  /*
> 
> Thanks, Juan.
diff mbox

Patch

diff --git a/migration/ram.c b/migration/ram.c
index 3f05738..7ab6ab5 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -263,6 +263,7 @@  typedef struct CompressParam CompressParam;
 
 struct DecompressParam {
     bool start;
+    bool done;
     QemuMutex mutex;
     QemuCond cond;
     void *des;
@@ -287,6 +288,8 @@  static bool quit_comp_thread;
 static bool quit_decomp_thread;
 static DecompressParam *decomp_param;
 static QemuThread *decompress_threads;
+static QemuMutex decomp_done_lock;
+static QemuCond decomp_done_cond;
 
 static int do_compress_ram_page(CompressParam *param);
 
@@ -834,6 +837,7 @@  static inline void start_compression(CompressParam *param)
 
 static inline void start_decompression(DecompressParam *param)
 {
+    param->done = false;
     qemu_mutex_lock(&param->mutex);
     param->start = true;
     qemu_cond_signal(&param->cond);
@@ -2193,19 +2197,24 @@  static void *do_data_decompress(void *opaque)
         qemu_mutex_lock(&param->mutex);
         while (!param->start && !quit_decomp_thread) {
             qemu_cond_wait(&param->cond, &param->mutex);
+        }
+        if (!quit_decomp_thread) {
             pagesize = TARGET_PAGE_SIZE;
-            if (!quit_decomp_thread) {
-                /* uncompress() will return failed in some case, especially
-                 * when the page is dirted when doing the compression, it's
-                 * not a problem because the dirty page will be retransferred
-                 * and uncompress() won't break the data in other pages.
-                 */
-                uncompress((Bytef *)param->des, &pagesize,
-                           (const Bytef *)param->compbuf, param->len);
-            }
-            param->start = false;
+            /* uncompress() will return failed in some case, especially
+             * when the page is dirted when doing the compression, it's
+             * not a problem because the dirty page will be retransferred
+             * and uncompress() won't break the data in other pages.
+             */
+            uncompress((Bytef *)param->des, &pagesize,
+                       (const Bytef *)param->compbuf, param->len);
         }
+        param->start = false;
         qemu_mutex_unlock(&param->mutex);
+
+        qemu_mutex_lock(&decomp_done_lock);
+        param->done = true;
+        qemu_cond_signal(&decomp_done_cond);
+        qemu_mutex_unlock(&decomp_done_lock);
     }
 
     return NULL;
@@ -2219,10 +2228,13 @@  void migrate_decompress_threads_create(void)
     decompress_threads = g_new0(QemuThread, thread_count);
     decomp_param = g_new0(DecompressParam, thread_count);
     quit_decomp_thread = false;
+    qemu_mutex_init(&decomp_done_lock);
+    qemu_cond_init(&decomp_done_cond);
     for (i = 0; i < thread_count; i++) {
         qemu_mutex_init(&decomp_param[i].mutex);
         qemu_cond_init(&decomp_param[i].cond);
         decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+        decomp_param[i].done = true;
         qemu_thread_create(decompress_threads + i, "decompress",
                            do_data_decompress, decomp_param + i,
                            QEMU_THREAD_JOINABLE);
@@ -2258,9 +2270,10 @@  static void decompress_data_with_multi_threads(QEMUFile *f,
     int idx, thread_count;
 
     thread_count = migrate_decompress_threads();
+    qemu_mutex_lock(&decomp_done_lock);
     while (true) {
         for (idx = 0; idx < thread_count; idx++) {
-            if (!decomp_param[idx].start) {
+            if (decomp_param[idx].done) {
                 qemu_get_buffer(f, decomp_param[idx].compbuf, len);
                 decomp_param[idx].des = host;
                 decomp_param[idx].len = len;
@@ -2270,8 +2283,11 @@  static void decompress_data_with_multi_threads(QEMUFile *f,
         }
         if (idx < thread_count) {
             break;
+        } else {
+            qemu_cond_wait(&decomp_done_cond, &decomp_done_lock);
         }
     }
+    qemu_mutex_unlock(&decomp_done_lock);
 }
 
 /*