diff mbox

[11/17] migration: Create thread infrastructure for multifd send side

Message ID 1485207141-1941-12-git-send-email-quintela@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Juan Quintela Jan. 23, 2017, 9:32 p.m. UTC
We make the locking and the transfer of information specific, even if we
are still transmiting things through the main thread.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 52 insertions(+), 1 deletion(-)

Comments

Paolo Bonzini Jan. 26, 2017, 12:38 p.m. UTC | #1
On 23/01/2017 22:32, Juan Quintela wrote:
> We make the locking and the transfer of information specific, even if we
> are still transmiting things through the main thread.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 52 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index c71929e..9d7bc64 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
>  /* Multiple fd's */
> 
>  struct MultiFDSendParams {
> +    /* not changed */
>      QemuThread thread;
>      QIOChannel *c;
>      QemuCond cond;
>      QemuMutex mutex;
> +    /* protected by param mutex */
>      bool quit;
>      bool started;
> +    uint8_t *address;
> +    /* protected by multifd mutex */
> +    bool done;
>  };
>  typedef struct MultiFDSendParams MultiFDSendParams;
> 
>  static MultiFDSendParams *multifd_send;
> 
> +QemuMutex multifd_send_mutex;
> +QemuCond multifd_send_cond;

Having n+1 semaphores instead of n+1 cond/mutex pairs could be more
efficient.  See thread-pool.c for an example.

Paolo

>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *params = opaque;
> @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque)
> 
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
> -        qemu_cond_wait(&params->cond, &params->mutex);
> +        if (params->address) {
> +            params->address = 0;
> +            qemu_mutex_unlock(&params->mutex);
> +            qemu_mutex_lock(&multifd_send_mutex);
> +            params->done = true;
> +            qemu_cond_signal(&multifd_send_cond);
> +            qemu_mutex_unlock(&multifd_send_mutex);
> +            qemu_mutex_lock(&params->mutex);
> +        } else {
> +            qemu_cond_wait(&params->cond, &params->mutex);
> +        }
>      }
>      qemu_mutex_unlock(&params->mutex);
> 
> @@ -464,12 +482,16 @@ void migrate_multifd_send_threads_create(void)
>      }
>      thread_count = migrate_multifd_threads();
>      multifd_send = g_new0(MultiFDSendParams, thread_count);
> +    qemu_mutex_init(&multifd_send_mutex);
> +    qemu_cond_init(&multifd_send_cond);
>      for (i = 0; i < thread_count; i++) {
>          char thread_name[15];
>          qemu_mutex_init(&multifd_send[i].mutex);
>          qemu_cond_init(&multifd_send[i].cond);
>          multifd_send[i].quit = false;
>          multifd_send[i].started = false;
> +        multifd_send[i].done = true;
> +        multifd_send[i].address = 0;
>          multifd_send[i].c = socket_send_channel_create();
>          if(!multifd_send[i].c) {
>              error_report("Error creating a send channel");
> @@ -487,6 +509,34 @@ void migrate_multifd_send_threads_create(void)
>      }
>  }
> 
> +static int multifd_send_page(uint8_t *address)
> +{
> +    int i, thread_count;
> +    bool found = false;
> +
> +    thread_count = migrate_multifd_threads();
> +    qemu_mutex_lock(&multifd_send_mutex);
> +    while (!found) {
> +        for (i = 0; i < thread_count; i++) {
> +            if (multifd_send[i].done) {
> +                multifd_send[i].done = false;
> +                found = true;
> +                break;
> +            }
> +        }
> +        if (!found) {
> +            qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex);
> +        }
> +    }
> +    qemu_mutex_unlock(&multifd_send_mutex);
> +    qemu_mutex_lock(&multifd_send[i].mutex);
> +    multifd_send[i].address = address;
> +    qemu_cond_signal(&multifd_send[i].cond);
> +    qemu_mutex_unlock(&multifd_send[i].mutex);
> +
> +    return 0;
> +}
> +
>  struct MultiFDRecvParams {
>      QemuThread thread;
>      QIOChannel *c;
> @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>          *bytes_transferred +=
>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>          qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> +        multifd_send_page(p);
>          *bytes_transferred += TARGET_PAGE_SIZE;
>          pages = 1;
>          acct_info.norm_pages++;
>
Dr. David Alan Gilbert Feb. 2, 2017, 12:03 p.m. UTC | #2
* Juan Quintela (quintela@redhat.com) wrote:
> We make the locking and the transfer of information specific, even if we
> are still transmiting things through the main thread.
> 
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 52 insertions(+), 1 deletion(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index c71929e..9d7bc64 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
>  /* Multiple fd's */
> 
>  struct MultiFDSendParams {
> +    /* not changed */
>      QemuThread thread;
>      QIOChannel *c;
>      QemuCond cond;
>      QemuMutex mutex;
> +    /* protected by param mutex */
>      bool quit;
>      bool started;
> +    uint8_t *address;
> +    /* protected by multifd mutex */
> +    bool done;
>  };
>  typedef struct MultiFDSendParams MultiFDSendParams;
> 
>  static MultiFDSendParams *multifd_send;
> 
> +QemuMutex multifd_send_mutex;
> +QemuCond multifd_send_cond;
> +
>  static void *multifd_send_thread(void *opaque)
>  {
>      MultiFDSendParams *params = opaque;
> @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque)
> 
>      qemu_mutex_lock(&params->mutex);
>      while (!params->quit){
> -        qemu_cond_wait(&params->cond, &params->mutex);
> +        if (params->address) {
> +            params->address = 0;

This confused me (I wondered what happens to the 1st block) but
I see in the next patch this gets replaced by something more complex;
so I suggest just using params->dummy and commented it's about
to get replaced.

> +            qemu_mutex_unlock(&params->mutex);
> +            qemu_mutex_lock(&multifd_send_mutex);
> +            params->done = true;
> +            qemu_cond_signal(&multifd_send_cond);
> +            qemu_mutex_unlock(&multifd_send_mutex);
> +            qemu_mutex_lock(&params->mutex);
> +        } else {
> +            qemu_cond_wait(&params->cond, &params->mutex);
> +        }
>      }
>      qemu_mutex_unlock(&params->mutex);
> 
> @@ -464,12 +482,16 @@ void migrate_multifd_send_threads_create(void)
>      }
>      thread_count = migrate_multifd_threads();
>      multifd_send = g_new0(MultiFDSendParams, thread_count);
> +    qemu_mutex_init(&multifd_send_mutex);
> +    qemu_cond_init(&multifd_send_cond);
>      for (i = 0; i < thread_count; i++) {
>          char thread_name[15];
>          qemu_mutex_init(&multifd_send[i].mutex);
>          qemu_cond_init(&multifd_send[i].cond);
>          multifd_send[i].quit = false;
>          multifd_send[i].started = false;
> +        multifd_send[i].done = true;
> +        multifd_send[i].address = 0;
>          multifd_send[i].c = socket_send_channel_create();
>          if(!multifd_send[i].c) {
>              error_report("Error creating a send channel");
> @@ -487,6 +509,34 @@ void migrate_multifd_send_threads_create(void)
>      }
>  }
> 
> +static int multifd_send_page(uint8_t *address)
> +{
> +    int i, thread_count;
> +    bool found = false;
> +
> +    thread_count = migrate_multifd_threads();
> +    qemu_mutex_lock(&multifd_send_mutex);
> +    while (!found) {
> +        for (i = 0; i < thread_count; i++) {
> +            if (multifd_send[i].done) {
> +                multifd_send[i].done = false;
> +                found = true;
> +                break;
> +            }
> +        }
> +        if (!found) {
> +            qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex);
> +        }
> +    }
> +    qemu_mutex_unlock(&multifd_send_mutex);
> +    qemu_mutex_lock(&multifd_send[i].mutex);

Having a 'multifd_send_mutex' and a
         'multifd_send[i].mutex'
is pretty confusing!

> +    multifd_send[i].address = address;
> +    qemu_cond_signal(&multifd_send[i].cond);
> +    qemu_mutex_unlock(&multifd_send[i].mutex);
> +
> +    return 0;
> +}
> +
>  struct MultiFDRecvParams {
>      QemuThread thread;
>      QIOChannel *c;
> @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>          *bytes_transferred +=
>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>          qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> +        multifd_send_page(p);
>          *bytes_transferred += TARGET_PAGE_SIZE;
>          pages = 1;
>          acct_info.norm_pages++;
> -- 
> 2.9.3

I think I'm pretty OK with this; but we'll see what it looks like
after you think about Paolo's suggestion; it does feel like it should
be possible to do the locking etc simpler; I just don't know how.

Dave

--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Juan Quintela Feb. 13, 2017, 4:38 p.m. UTC | #3
Paolo Bonzini <pbonzini@redhat.com> wrote:
> On 23/01/2017 22:32, Juan Quintela wrote:
>> We make the locking and the transfer of information specific, even if we
>> are still transmiting things through the main thread.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>  1 file changed, 52 insertions(+), 1 deletion(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index c71929e..9d7bc64 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
>>  /* Multiple fd's */
>> 
>>  struct MultiFDSendParams {
>> +    /* not changed */
>>      QemuThread thread;
>>      QIOChannel *c;
>>      QemuCond cond;
>>      QemuMutex mutex;
>> +    /* protected by param mutex */
>>      bool quit;
>>      bool started;
>> +    uint8_t *address;
>> +    /* protected by multifd mutex */
>> +    bool done;
>>  };
>>  typedef struct MultiFDSendParams MultiFDSendParams;
>> 
>>  static MultiFDSendParams *multifd_send;
>> 
>> +QemuMutex multifd_send_mutex;
>> +QemuCond multifd_send_cond;
>
> Having n+1 semaphores instead of n+1 cond/mutex pairs could be more
> efficient.  See thread-pool.c for an example.

Did that.  See next version.

Only partial success.   It goes faster, and code is somehow easier.
But on reception, I end having to add 3 sems for thread (ok, I could
move to only two reusing them, but indeed).  On send side, I got
speedups, on reception side no, but I haven't still found the cause.

Thanks, Juan.
Juan Quintela Feb. 13, 2017, 4:40 p.m. UTC | #4
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> We make the locking and the transfer of information specific, even if we
>> are still transmiting things through the main thread.
>> 
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>>  migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>  1 file changed, 52 insertions(+), 1 deletion(-)
>> 
>> diff --git a/migration/ram.c b/migration/ram.c
>> index c71929e..9d7bc64 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
>>  /* Multiple fd's */
>> 
>>  struct MultiFDSendParams {
>> +    /* not changed */
>>      QemuThread thread;
>>      QIOChannel *c;
>>      QemuCond cond;
>>      QemuMutex mutex;
>> +    /* protected by param mutex */
>>      bool quit;
>>      bool started;
>> +    uint8_t *address;
>> +    /* protected by multifd mutex */
>> +    bool done;
>>  };
>>  typedef struct MultiFDSendParams MultiFDSendParams;
>> 
>>  static MultiFDSendParams *multifd_send;
>> 
>> +QemuMutex multifd_send_mutex;
>> +QemuCond multifd_send_cond;
>> +
>>  static void *multifd_send_thread(void *opaque)
>>  {
>>      MultiFDSendParams *params = opaque;
>> @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque)
>> 
>>      qemu_mutex_lock(&params->mutex);
>>      while (!params->quit){
>> -        qemu_cond_wait(&params->cond, &params->mutex);
>> +        if (params->address) {
>> +            params->address = 0;
>
> This confused me (I wondered what happens to the 1st block) but
> I see in the next patch this gets replaced by something more complex;
> so I suggest just using params->dummy and commented it's about
> to get replaced.

if you preffer, I wanted to minimize the change on the next patch,
otherwise I also have to change the places where I check the value of
address.


>> +    qemu_mutex_unlock(&multifd_send_mutex);
>> +    qemu_mutex_lock(&multifd_send[i].mutex);
>
> Having a 'multifd_send_mutex' and a
>          'multifd_send[i].mutex'
> is pretty confusing!

For different reason, I have moved all the

  multifd_send[i]. to "p->"

Better?


>
>> +    multifd_send[i].address = address;
>> +    qemu_cond_signal(&multifd_send[i].cond);
>> +    qemu_mutex_unlock(&multifd_send[i].mutex);
>> +
>> +    return 0;
>> +}
>> +
>>  struct MultiFDRecvParams {
>>      QemuThread thread;
>>      QIOChannel *c;
>> @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
>>          *bytes_transferred +=
>>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
>>          qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
>> +        multifd_send_page(p);
>>          *bytes_transferred += TARGET_PAGE_SIZE;
>>          pages = 1;
>>          acct_info.norm_pages++;
>> -- 
>> 2.9.3
>
> I think I'm pretty OK with this; but we'll see what it looks like
> after you think about Paolo's suggestion; it does feel like it should
> be possible to do the locking etc simpler; I just don't know how.

Locking can be simpler, but the problem is being speed :-(
Paolo suggestion have helped.
That our meassurement of bandwidth is lame, haven't :-(

Later, Juan.
Dr. David Alan Gilbert Feb. 14, 2017, 11:58 a.m. UTC | #5
* Juan Quintela (quintela@redhat.com) wrote:
> "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> > * Juan Quintela (quintela@redhat.com) wrote:
> >> We make the locking and the transfer of information specific, even if we
> >> are still transmiting things through the main thread.
> >> 
> >> Signed-off-by: Juan Quintela <quintela@redhat.com>
> >> ---
> >>  migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
> >>  1 file changed, 52 insertions(+), 1 deletion(-)
> >> 
> >> diff --git a/migration/ram.c b/migration/ram.c
> >> index c71929e..9d7bc64 100644
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void)
> >>  /* Multiple fd's */
> >> 
> >>  struct MultiFDSendParams {
> >> +    /* not changed */
> >>      QemuThread thread;
> >>      QIOChannel *c;
> >>      QemuCond cond;
> >>      QemuMutex mutex;
> >> +    /* protected by param mutex */
> >>      bool quit;
> >>      bool started;
> >> +    uint8_t *address;
> >> +    /* protected by multifd mutex */
> >> +    bool done;
> >>  };
> >>  typedef struct MultiFDSendParams MultiFDSendParams;
> >> 
> >>  static MultiFDSendParams *multifd_send;
> >> 
> >> +QemuMutex multifd_send_mutex;
> >> +QemuCond multifd_send_cond;
> >> +
> >>  static void *multifd_send_thread(void *opaque)
> >>  {
> >>      MultiFDSendParams *params = opaque;
> >> @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque)
> >> 
> >>      qemu_mutex_lock(&params->mutex);
> >>      while (!params->quit){
> >> -        qemu_cond_wait(&params->cond, &params->mutex);
> >> +        if (params->address) {
> >> +            params->address = 0;
> >
> > This confused me (I wondered what happens to the 1st block) but
> > I see in the next patch this gets replaced by something more complex;
> > so I suggest just using params->dummy and commented it's about
> > to get replaced.
> 
> if you preffer, I wanted to minimize the change on the next patch,
> otherwise I also have to change the places where I check the value of
> address.
> 

OK, perhaps just adding a comment to say it's going to go in the
next patch would work.

> >> +    qemu_mutex_unlock(&multifd_send_mutex);
> >> +    qemu_mutex_lock(&multifd_send[i].mutex);
> >
> > Having a 'multifd_send_mutex' and a
> >          'multifd_send[i].mutex'
> > is pretty confusing!
> 
> For different reason, I have moved all the
> 
>   multifd_send[i]. to "p->"
> 
> Better?

Maybe!

> >
> >> +    multifd_send[i].address = address;
> >> +    qemu_cond_signal(&multifd_send[i].cond);
> >> +    qemu_mutex_unlock(&multifd_send[i].mutex);
> >> +
> >> +    return 0;
> >> +}
> >> +
> >>  struct MultiFDRecvParams {
> >>      QemuThread thread;
> >>      QIOChannel *c;
> >> @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
> >>          *bytes_transferred +=
> >>              save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
> >>          qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
> >> +        multifd_send_page(p);
> >>          *bytes_transferred += TARGET_PAGE_SIZE;
> >>          pages = 1;
> >>          acct_info.norm_pages++;
> >> -- 
> >> 2.9.3
> >
> > I think I'm pretty OK with this; but we'll see what it looks like
> > after you think about Paolo's suggestion; it does feel like it should
> > be possible to do the locking etc simpler; I just don't know how.
> 
> Locking can be simpler, but the problem is being speed :-(
> Paolo suggestion have helped.
> That our meassurement of bandwidth is lame, haven't :-(

Are you sure that your performance problems are anything to do with locking?

Dave

> Later, Juan.
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff mbox

Patch

diff --git a/migration/ram.c b/migration/ram.c
index c71929e..9d7bc64 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -392,17 +392,25 @@  void migrate_compress_threads_create(void)
 /* Multiple fd's */

 struct MultiFDSendParams {
+    /* not changed */
     QemuThread thread;
     QIOChannel *c;
     QemuCond cond;
     QemuMutex mutex;
+    /* protected by param mutex */
     bool quit;
     bool started;
+    uint8_t *address;
+    /* protected by multifd mutex */
+    bool done;
 };
 typedef struct MultiFDSendParams MultiFDSendParams;

 static MultiFDSendParams *multifd_send;

+QemuMutex multifd_send_mutex;
+QemuCond multifd_send_cond;
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *params = opaque;
@@ -416,7 +424,17 @@  static void *multifd_send_thread(void *opaque)

     qemu_mutex_lock(&params->mutex);
     while (!params->quit){
-        qemu_cond_wait(&params->cond, &params->mutex);
+        if (params->address) {
+            params->address = 0;
+            qemu_mutex_unlock(&params->mutex);
+            qemu_mutex_lock(&multifd_send_mutex);
+            params->done = true;
+            qemu_cond_signal(&multifd_send_cond);
+            qemu_mutex_unlock(&multifd_send_mutex);
+            qemu_mutex_lock(&params->mutex);
+        } else {
+            qemu_cond_wait(&params->cond, &params->mutex);
+        }
     }
     qemu_mutex_unlock(&params->mutex);

@@ -464,12 +482,16 @@  void migrate_multifd_send_threads_create(void)
     }
     thread_count = migrate_multifd_threads();
     multifd_send = g_new0(MultiFDSendParams, thread_count);
+    qemu_mutex_init(&multifd_send_mutex);
+    qemu_cond_init(&multifd_send_cond);
     for (i = 0; i < thread_count; i++) {
         char thread_name[15];
         qemu_mutex_init(&multifd_send[i].mutex);
         qemu_cond_init(&multifd_send[i].cond);
         multifd_send[i].quit = false;
         multifd_send[i].started = false;
+        multifd_send[i].done = true;
+        multifd_send[i].address = 0;
         multifd_send[i].c = socket_send_channel_create();
         if(!multifd_send[i].c) {
             error_report("Error creating a send channel");
@@ -487,6 +509,34 @@  void migrate_multifd_send_threads_create(void)
     }
 }

+static int multifd_send_page(uint8_t *address)
+{
+    int i, thread_count;
+    bool found = false;
+
+    thread_count = migrate_multifd_threads();
+    qemu_mutex_lock(&multifd_send_mutex);
+    while (!found) {
+        for (i = 0; i < thread_count; i++) {
+            if (multifd_send[i].done) {
+                multifd_send[i].done = false;
+                found = true;
+                break;
+            }
+        }
+        if (!found) {
+            qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex);
+        }
+    }
+    qemu_mutex_unlock(&multifd_send_mutex);
+    qemu_mutex_lock(&multifd_send[i].mutex);
+    multifd_send[i].address = address;
+    qemu_cond_signal(&multifd_send[i].cond);
+    qemu_mutex_unlock(&multifd_send[i].mutex);
+
+    return 0;
+}
+
 struct MultiFDRecvParams {
     QemuThread thread;
     QIOChannel *c;
@@ -1015,6 +1065,7 @@  static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
         *bytes_transferred +=
             save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
         qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
+        multifd_send_page(p);
         *bytes_transferred += TARGET_PAGE_SIZE;
         pages = 1;
         acct_info.norm_pages++;