diff mbox

[12/17] migration: really use multiple pages at a time

Message ID 1485207141-1941-13-git-send-email-quintela@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Juan Quintela Jan. 23, 2017, 9:32 p.m. UTC
We now send several pages at a time each time that we wakeup a thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 44 ++++++++++++++++++++++++++++++++++++++------
 1 file changed, 38 insertions(+), 6 deletions(-)

Comments

Dr. David Alan Gilbert Feb. 3, 2017, 10:54 a.m. UTC | #1
* Juan Quintela (quintela@redhat.com) wrote:
> We now send several pages at a time each time that we wakeup a thread.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 44 ++++++++++++++++++++++++++++++++++++++------
>  1 file changed, 38 insertions(+), 6 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index 9d7bc64..1267730 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -391,6 +391,13 @@ void migrate_compress_threads_create(void)
> 
>  /* Multiple fd's */
> 
> +
> +typedef struct {
> +    int num;
> +    int size;
> +    uint8_t **address;
> +} multifd_pages_t;

The naming is odd for QEMU; should be MultiFDPages ?
You might want to make num & size unsigned.

I was trying to understand why you store 'size' - is that because you worry about
someone changing the size parameter while we're running?  But given that we call
init in a few places I'm not sure it covers it.


>  struct MultiFDSendParams {
>      /* not changed */
>      QemuThread thread;
> @@ -400,7 +407,7 @@ struct MultiFDSendParams {
>      /* protected by param mutex */
>      bool quit;
>      bool started;
> -    uint8_t *address;
> +    multifd_pages_t pages;
>      /* protected by multifd mutex */
>      bool done;
>  };
> @@ -424,8 +431,8 @@ static void *multifd_send_thread(void *opaque)
> 
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
> -        if (params->address) {
> -            params->address = 0;
> +        if (params->pages.num) {
> +            params->pages.num = 0;
>              qemu_mutex_unlock(&params->mutex);
>              qemu_mutex_lock(&multifd_send_mutex);
>              params->done = true;
> @@ -473,6 +480,13 @@ void migrate_multifd_send_threads_join(void)
>      multifd_send = NULL;
>  }
> 
> +static void multifd_init_group(multifd_pages_t *pages)
> +{
> +    pages->num = 0;
> +    pages->size = migrate_multifd_group();
> +    pages->address = g_malloc0(pages->size * sizeof(uint8_t *));

g_new0(uint8_t *, pages->size)

> +}
> +
>  void migrate_multifd_send_threads_create(void)
>  {
>      int i, thread_count;
> @@ -491,7 +505,7 @@ void migrate_multifd_send_threads_create(void)
>          multifd_send[i].quit = false;
>          multifd_send[i].started = false;
>          multifd_send[i].done = true;
> -        multifd_send[i].address = 0;
> +        multifd_init_group(&multifd_send[i].pages);
>          multifd_send[i].c = socket_send_channel_create();
>          if(!multifd_send[i].c) {
>              error_report("Error creating a send channel");
> @@ -511,8 +525,22 @@ void migrate_multifd_send_threads_create(void)
> 
>  static int multifd_send_page(uint8_t *address)

Can you comment multifd_send_page to explain what it returns.

(Do we really need u16 for fd number? More than 256 streams would seem
surprising).

>  {
> -    int i, thread_count;
> +    int i, j, thread_count;
>      bool found = false;
> +    static multifd_pages_t pages;
> +    static bool once = false;
> +
> +    if (!once) {
> +        multifd_init_group(&pages);
> +        once = true;
> +    }
> +
> +    pages.address[pages.num] = address;
> +    pages.num++;
> +
> +    if (pages.num < (pages.size - 1)) {
> +        return UINT16_MAX;
> +    }
> 
>      thread_count = migrate_multifd_threads();
>      qemu_mutex_lock(&multifd_send_mutex);
> @@ -530,7 +558,11 @@ static int multifd_send_page(uint8_t *address)
>      }
>      qemu_mutex_unlock(&multifd_send_mutex);
>      qemu_mutex_lock(&multifd_send[i].mutex);
> -    multifd_send[i].address = address;
> +    multifd_send[i].pages.num = pages.num;
> +    for(j = 0; j < pages.size; j++) {
> +        multifd_send[i].pages.address[j] = pages.address[j];
> +    }
> +    pages.num = 0;
>      qemu_cond_signal(&multifd_send[i].cond);
>      qemu_mutex_unlock(&multifd_send[i].mutex);
> 
> -- 
> 2.9.3

Dave

> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Juan Quintela Feb. 13, 2017, 4:47 p.m. UTC | #2
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We now send several pages at a time each time that we wakeup a thread.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 44 ++++++++++++++++++++++++++++++++++++++------
>>  1 file changed, 38 insertions(+), 6 deletions(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 9d7bc64..1267730 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -391,6 +391,13 @@ void migrate_compress_threads_create(void)
>> 
>>  /* Multiple fd's */
>> 
>> +
>> +typedef struct {
>> +    int num;
>> +    int size;
>> +    uint8_t **address;
>> +} multifd_pages_t;
>
> The naming is odd for QEMU; should be MultiFDPages ?
> You might want to make num & size unsigned.

ok.

> I was trying to understand why you store 'size' - is that because you worry about
> someone changing the size parameter while we're running?  But given that we call
> init in a few places I'm not sure it covers it.

No.  I am planning to chang the code so I call:

ram_save_block(&multifd_pages, &size,...)

And it returns at most "size" pages, depending on whatever it is easy to
do.  Plan is not to change blocks, or do any expensive operation.  Just
return the next "size" pages available that are easy to pick.


>> +    pages->num = 0;
>> +    pages->size = migrate_multifd_group();
>> +    pages->address = g_malloc0(pages->size * sizeof(uint8_t *));
>
> g_new0(uint8_t *, pages->size)

changing it, I don't  really care O:-)

>> +}
>> +
>>  void migrate_multifd_send_threads_create(void)
>>  {
>>      int i, thread_count;
>> @@ -491,7 +505,7 @@ void migrate_multifd_send_threads_create(void)
>>          multifd_send[i].quit = false;
>>          multifd_send[i].started = false;
>>          multifd_send[i].done = true;
>> -        multifd_send[i].address = 0;
>> +        multifd_init_group(&multifd_send[i].pages);
>>          multifd_send[i].c = socket_send_channel_create();
>>          if(!multifd_send[i].c) {
>>              error_report("Error creating a send channel");
>> @@ -511,8 +525,22 @@ void migrate_multifd_send_threads_create(void)
>> 
>>  static int multifd_send_page(uint8_t *address)
>
> Can you comment multifd_send_page to explain what it returns.
>
> (Do we really need u16 for fd number? More than 256 streams would seem
> surprising).

I *guess*, (big guess) that people are going to use the same number of
threads than cpus.  And we are going to have more than 256 in the near
future, su it looked more future proof to use uint16_t.  I don't really
care though.

Thanks, Juan.
diff mbox

Patch

diff --git a/migration/ram.c b/migration/ram.c
index 9d7bc64..1267730 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -391,6 +391,13 @@  void migrate_compress_threads_create(void)

 /* Multiple fd's */

+
+typedef struct {
+    int num;
+    int size;
+    uint8_t **address;
+} multifd_pages_t;
+
 struct MultiFDSendParams {
     /* not changed */
     QemuThread thread;
@@ -400,7 +407,7 @@  struct MultiFDSendParams {
     /* protected by param mutex */
     bool quit;
     bool started;
-    uint8_t *address;
+    multifd_pages_t pages;
     /* protected by multifd mutex */
     bool done;
 };
@@ -424,8 +431,8 @@  static void *multifd_send_thread(void *opaque)

     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
-        if (params->address) {
-            params->address = 0;
+        if (params->pages.num) {
+            params->pages.num = 0;
             qemu_mutex_unlock(&params->mutex);
             qemu_mutex_lock(&multifd_send_mutex);
             params->done = true;
@@ -473,6 +480,13 @@  void migrate_multifd_send_threads_join(void)
     multifd_send = NULL;
 }

+static void multifd_init_group(multifd_pages_t *pages)
+{
+    pages->num = 0;
+    pages->size = migrate_multifd_group();
+    pages->address = g_malloc0(pages->size * sizeof(uint8_t *));
+}
+
 void migrate_multifd_send_threads_create(void)
 {
     int i, thread_count;
@@ -491,7 +505,7 @@  void migrate_multifd_send_threads_create(void)
         multifd_send[i].quit = false;
         multifd_send[i].started = false;
         multifd_send[i].done = true;
-        multifd_send[i].address = 0;
+        multifd_init_group(&multifd_send[i].pages);
         multifd_send[i].c = socket_send_channel_create();
         if(!multifd_send[i].c) {
             error_report("Error creating a send channel");
@@ -511,8 +525,22 @@  void migrate_multifd_send_threads_create(void)

 static int multifd_send_page(uint8_t *address)
 {
-    int i, thread_count;
+    int i, j, thread_count;
     bool found = false;
+    static multifd_pages_t pages;
+    static bool once = false;
+
+    if (!once) {
+        multifd_init_group(&pages);
+        once = true;
+    }
+
+    pages.address[pages.num] = address;
+    pages.num++;
+
+    if (pages.num < (pages.size - 1)) {
+        return UINT16_MAX;
+    }

     thread_count = migrate_multifd_threads();
     qemu_mutex_lock(&multifd_send_mutex);
@@ -530,7 +558,11 @@  static int multifd_send_page(uint8_t *address)
     }
     qemu_mutex_unlock(&multifd_send_mutex);
     qemu_mutex_lock(&multifd_send[i].mutex);
-    multifd_send[i].address = address;
+    multifd_send[i].pages.num = pages.num;
+    for(j = 0; j < pages.size; j++) {
+        multifd_send[i].pages.address[j] = pages.address[j];
+    }
+    pages.num = 0;
     qemu_cond_signal(&multifd_send[i].cond);
     qemu_mutex_unlock(&multifd_send[i].mutex);