diff mbox series

[v2,04/13] ram.c: Reset result after sending queued data

Message ID 01e40e659d664ec12470fb6495f00f1ee78d5cb3.1681983401.git.lukasstraub2@web.de (mailing list archive)
State New, archived
Headers show
Series migration/ram.c: Refactor compress code | expand

Commit Message

Lukas Straub April 20, 2023, 9:48 a.m. UTC
And take the param->mutex lock for the whole section to ensure
thread-safety.
Now, it is explicitly clear if there is no queued data to send.
Before, this was handled by param->file stream being empty and thus
qemu_put_qemu_file() not sending anything.

This will be used in the next commits to move save_page_header()
out of compress code.

Signed-off-by: Lukas Straub <lukasstraub2@web.de>
---
 migration/ram.c | 32 ++++++++++++++++++++++----------
 1 file changed, 22 insertions(+), 10 deletions(-)

--
2.40.0

Comments

Juan Quintela April 28, 2023, 11:59 a.m. UTC | #1
Lukas Straub <lukasstraub2@web.de> wrote:
> And take the param->mutex lock for the whole section to ensure
> thread-safety.
> Now, it is explicitly clear if there is no queued data to send.
> Before, this was handled by param->file stream being empty and thus
> qemu_put_qemu_file() not sending anything.
>
> This will be used in the next commits to move save_page_header()
> out of compress code.
>
> Signed-off-by: Lukas Straub <lukasstraub2@web.de>

Reviewed-by: Juan Quintela <quintela@redhat.com>

If you do more changes around here, please consider:

> @@ -1540,13 +1547,16 @@ static void flush_compressed_data(RAMState *rs)
>      for (idx = 0; idx < thread_count; idx++) {

Move

CompressParam *param = &comp_param[idx];

to here, and use it also for the locks.
I will even think about calling the variable just p.

And once there, everything under the sun except this uses i as a for
variable, not idx O:-)

>          qemu_mutex_lock(&comp_param[idx].mutex);
>          if (!comp_param[idx].quit) {
> -            len = qemu_put_qemu_file(ms->to_dst_file, comp_param[idx].file);
> +            CompressParam *param = &comp_param[idx];

Move this declaration

> +            len = qemu_put_qemu_file(ms->to_dst_file, param->file);
> +            compress_reset_result(param);
> +
>              /*
>               * it's safe to fetch zero_page without holding comp_done_lock
>               * as there is no further request submitted to the thread,
>               * i.e, the thread should be waiting for a request at this point.
>               */
> -            update_compress_thread_counts(&comp_param[idx], len);
> +            update_compress_thread_counts(param, len);
>          }
>          qemu_mutex_unlock(&comp_param[idx].mutex);
>      }
> @@ -1571,15 +1581,17 @@ static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset)
>  retry:
>      for (idx = 0; idx < thread_count; idx++) {
>          if (comp_param[idx].done) {
> -            comp_param[idx].done = false;


Same here.
diff mbox series

Patch

diff --git a/migration/ram.c b/migration/ram.c
index 820b4ebaeb..5ca0f115cf 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -1519,6 +1519,13 @@  update_compress_thread_counts(const CompressParam *param, int bytes_xmit)

 static bool save_page_use_compression(RAMState *rs);

+static inline void compress_reset_result(CompressParam *param)
+{
+    param->result = RES_NONE;
+    param->block = NULL;
+    param->offset = 0;
+}
+
 static void flush_compressed_data(RAMState *rs)
 {
     MigrationState *ms = migrate_get_current();
@@ -1540,13 +1547,16 @@  static void flush_compressed_data(RAMState *rs)
     for (idx = 0; idx < thread_count; idx++) {
         qemu_mutex_lock(&comp_param[idx].mutex);
         if (!comp_param[idx].quit) {
-            len = qemu_put_qemu_file(ms->to_dst_file, comp_param[idx].file);
+            CompressParam *param = &comp_param[idx];
+            len = qemu_put_qemu_file(ms->to_dst_file, param->file);
+            compress_reset_result(param);
+
             /*
              * it's safe to fetch zero_page without holding comp_done_lock
              * as there is no further request submitted to the thread,
              * i.e, the thread should be waiting for a request at this point.
              */
-            update_compress_thread_counts(&comp_param[idx], len);
+            update_compress_thread_counts(param, len);
         }
         qemu_mutex_unlock(&comp_param[idx].mutex);
     }
@@ -1571,15 +1581,17 @@  static int compress_page_with_multi_thread(RAMBlock *block, ram_addr_t offset)
 retry:
     for (idx = 0; idx < thread_count; idx++) {
         if (comp_param[idx].done) {
-            comp_param[idx].done = false;
-            bytes_xmit = qemu_put_qemu_file(ms->to_dst_file,
-                                            comp_param[idx].file);
-            qemu_mutex_lock(&comp_param[idx].mutex);
-            set_compress_params(&comp_param[idx], block, offset);
-            qemu_cond_signal(&comp_param[idx].cond);
-            qemu_mutex_unlock(&comp_param[idx].mutex);
+            CompressParam *param = &comp_param[idx];
+            qemu_mutex_lock(&param->mutex);
+            param->done = false;
+            bytes_xmit = qemu_put_qemu_file(ms->to_dst_file, param->file);
+            compress_reset_result(param);
+            set_compress_params(param, block, offset);
+
+            update_compress_thread_counts(param, bytes_xmit);
+            qemu_cond_signal(&param->cond);
+            qemu_mutex_unlock(&param->mutex);
             pages = 1;
-            update_compress_thread_counts(&comp_param[idx], bytes_xmit);
             break;
         }
     }