diff mbox series

[21/21] migration/multifd: Compute transferred bytes correctly

Message ID 20230508130909.65420-22-quintela@redhat.com (mailing list archive)
State New, archived
Headers show
Series Migration: More migration atomic counters | expand

Commit Message

Juan Quintela May 8, 2023, 1:09 p.m. UTC
In the past, we had to put the in the main thread all the operations
related with sizes due to qemu_file not beeing thread safe.  As now
all counters are atomic, we can update the counters just after the
do the write.  As an aditional bonus, we are able to use the right
value for the compression methods.  Right now we were assuming that
there were no compression at all.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/multifd.c | 13 ++++++++-----
 1 file changed, 8 insertions(+), 5 deletions(-)

Comments

Peter Xu May 18, 2023, 4:32 p.m. UTC | #1
On Mon, May 08, 2023 at 03:09:09PM +0200, Juan Quintela wrote:
> In the past, we had to put the in the main thread all the operations
> related with sizes due to qemu_file not beeing thread safe.  As now
> all counters are atomic, we can update the counters just after the
> do the write.  As an aditional bonus, we are able to use the right
> value for the compression methods.  Right now we were assuming that
> there were no compression at all.

Maybe worth mention that initial packet is also accounted after this.

> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Two more trivial nits:

> ---
>  migration/multifd.c | 13 ++++++++-----
>  1 file changed, 8 insertions(+), 5 deletions(-)
> 
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 9d2ade7abc..3a19d8e304 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -175,6 +175,7 @@ void multifd_register_ops(int method, MultiFDMethods *ops)
>  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
>  {
>      MultiFDInit_t msg = {};
> +    size_t size = sizeof(msg);
>      int ret;
>  
>      msg.magic = cpu_to_be32(MULTIFD_MAGIC);
> @@ -182,10 +183,12 @@ static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
>      msg.id = p->id;
>      memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
>  
> -    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
> +    ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
>      if (ret != 0) {
>          return -1;
>      }
> +    stat64_add(&mig_stats.multifd_bytes, size);
> +    stat64_add(&mig_stats.transferred, size);
>      return 0;
>  }
>  
> @@ -396,7 +399,6 @@ static int multifd_send_pages(QEMUFile *f)
>      static int next_channel;
>      MultiFDSendParams *p = NULL; /* make happy gcc */
>      MultiFDPages_t *pages = multifd_send_state->pages;
> -    uint64_t transferred;
>  
>      if (qatomic_read(&multifd_send_state->exiting)) {
>          return -1;
> @@ -431,10 +433,7 @@ static int multifd_send_pages(QEMUFile *f)
>      p->packet_num = multifd_send_state->packet_num++;
>      multifd_send_state->pages = p->pages;
>      p->pages = pages;
> -    transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len;
>      qemu_mutex_unlock(&p->mutex);
> -    stat64_add(&mig_stats.transferred, transferred);
> -    stat64_add(&mig_stats.multifd_bytes, transferred);
>      qemu_sem_post(&p->sem);
>  
>      return 1;
> @@ -716,6 +715,8 @@ static void *multifd_send_thread(void *opaque)
>                  if (ret != 0) {
>                      break;
>                  }
> +                stat64_add(&mig_stats.multifd_bytes, p->packet_len);
> +                stat64_add(&mig_stats.transferred, p->packet_len);
>              } else {
>                  /* Send header using the same writev call */
>                  p->iov[0].iov_len = p->packet_len;
> @@ -728,6 +729,8 @@ static void *multifd_send_thread(void *opaque)
>                  break;
>              }
>  
> +            stat64_add(&mig_stats.multifd_bytes, p->next_packet_size);
> +            stat64_add(&mig_stats.transferred, p->next_packet_size);

Two nits:

Maybe merge the two so half atomic operations?

Also maybe also worth having a inline helper for adding both multifd_bytes
and transferred?

With/without that, all look good:

Reviewed-by: Peter Xu <peterx@redhat.com>

Thanks,

>              qemu_mutex_lock(&p->mutex);
>              p->pending_job--;
>              qemu_mutex_unlock(&p->mutex);
> -- 
> 2.40.0
>
Juan Quintela May 18, 2023, 4:40 p.m. UTC | #2
Peter Xu <peterx@redhat.com> wrote:
> On Mon, May 08, 2023 at 03:09:09PM +0200, Juan Quintela wrote:
>> In the past, we had to put the in the main thread all the operations
>> related with sizes due to qemu_file not beeing thread safe.  As now
>> all counters are atomic, we can update the counters just after the
>> do the write.  As an aditional bonus, we are able to use the right
>> value for the compression methods.  Right now we were assuming that
>> there were no compression at all.
>
> Maybe worth mention that initial packet is also accounted after this.

Ok.
>>  
>> +            stat64_add(&mig_stats.multifd_bytes, p->next_packet_size);
>> +            stat64_add(&mig_stats.transferred, p->next_packet_size);
>
> Two nits:
>
> Maybe merge the two so half atomic operations?

On my tree, to send after this got in:

77fdd3475c migration: Remove transferred atomic counter

O:-)

> Also maybe also worth having a inline helper for adding both multifd_bytes
> and transferred?

I am removing it.

After next set of packates:

rate limit is calulated as:

begining_period = migration_transferred_bytes();
...

bytes_this_period = migration_transferred_bytes() - begining_period;

transferred is calculated as:
- multifd_bytes + qemu_file_bytes;

So things get really simple.  As all counters are atomic, you do a
write and after the write to increse the write size to the qemu_file or
to the multifd_bytes.  And that is it.


> With/without that, all look good:
>
> Reviewed-by: Peter Xu <peterx@redhat.com>

Thanks, Juan.
Peter Xu May 18, 2023, 6:32 p.m. UTC | #3
On Thu, May 18, 2023 at 06:40:18PM +0200, Juan Quintela wrote:
> Peter Xu <peterx@redhat.com> wrote:
> > On Mon, May 08, 2023 at 03:09:09PM +0200, Juan Quintela wrote:
> >> In the past, we had to put the in the main thread all the operations
> >> related with sizes due to qemu_file not beeing thread safe.  As now
> >> all counters are atomic, we can update the counters just after the
> >> do the write.  As an aditional bonus, we are able to use the right
> >> value for the compression methods.  Right now we were assuming that
> >> there were no compression at all.
> >
> > Maybe worth mention that initial packet is also accounted after this.
> 
> Ok.
> >>  
> >> +            stat64_add(&mig_stats.multifd_bytes, p->next_packet_size);
> >> +            stat64_add(&mig_stats.transferred, p->next_packet_size);
> >
> > Two nits:
> >
> > Maybe merge the two so half atomic operations?
> 
> On my tree, to send after this got in:
> 
> 77fdd3475c migration: Remove transferred atomic counter
> 
> O:-)

Ah this looks even better, indeed. :)

What I meant was we can also do atomic update in one shot for both
next_packet_size + packet_len.

> 
> > Also maybe also worth having a inline helper for adding both multifd_bytes
> > and transferred?
> 
> I am removing it.
> 
> After next set of packates:
> 
> rate limit is calulated as:
> 
> begining_period = migration_transferred_bytes();
> ...
> 
> bytes_this_period = migration_transferred_bytes() - begining_period;
> 
> transferred is calculated as:
> - multifd_bytes + qemu_file_bytes;
> 
> So things get really simple.  As all counters are atomic, you do a
> write and after the write to increse the write size to the qemu_file or
> to the multifd_bytes.  And that is it.

Agreed.

Thanks,
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index 9d2ade7abc..3a19d8e304 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -175,6 +175,7 @@  void multifd_register_ops(int method, MultiFDMethods *ops)
 static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
 {
     MultiFDInit_t msg = {};
+    size_t size = sizeof(msg);
     int ret;
 
     msg.magic = cpu_to_be32(MULTIFD_MAGIC);
@@ -182,10 +183,12 @@  static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
     msg.id = p->id;
     memcpy(msg.uuid, &qemu_uuid.data, sizeof(msg.uuid));
 
-    ret = qio_channel_write_all(p->c, (char *)&msg, sizeof(msg), errp);
+    ret = qio_channel_write_all(p->c, (char *)&msg, size, errp);
     if (ret != 0) {
         return -1;
     }
+    stat64_add(&mig_stats.multifd_bytes, size);
+    stat64_add(&mig_stats.transferred, size);
     return 0;
 }
 
@@ -396,7 +399,6 @@  static int multifd_send_pages(QEMUFile *f)
     static int next_channel;
     MultiFDSendParams *p = NULL; /* make happy gcc */
     MultiFDPages_t *pages = multifd_send_state->pages;
-    uint64_t transferred;
 
     if (qatomic_read(&multifd_send_state->exiting)) {
         return -1;
@@ -431,10 +433,7 @@  static int multifd_send_pages(QEMUFile *f)
     p->packet_num = multifd_send_state->packet_num++;
     multifd_send_state->pages = p->pages;
     p->pages = pages;
-    transferred = ((uint64_t) pages->num) * p->page_size + p->packet_len;
     qemu_mutex_unlock(&p->mutex);
-    stat64_add(&mig_stats.transferred, transferred);
-    stat64_add(&mig_stats.multifd_bytes, transferred);
     qemu_sem_post(&p->sem);
 
     return 1;
@@ -716,6 +715,8 @@  static void *multifd_send_thread(void *opaque)
                 if (ret != 0) {
                     break;
                 }
+                stat64_add(&mig_stats.multifd_bytes, p->packet_len);
+                stat64_add(&mig_stats.transferred, p->packet_len);
             } else {
                 /* Send header using the same writev call */
                 p->iov[0].iov_len = p->packet_len;
@@ -728,6 +729,8 @@  static void *multifd_send_thread(void *opaque)
                 break;
             }
 
+            stat64_add(&mig_stats.multifd_bytes, p->next_packet_size);
+            stat64_add(&mig_stats.transferred, p->next_packet_size);
             qemu_mutex_lock(&p->mutex);
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);