Message ID | 1485207141-1941-12-git-send-email-quintela@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On 23/01/2017 22:32, Juan Quintela wrote: > We make the locking and the transfer of information specific, even if we > are still transmiting things through the main thread. > > Signed-off-by: Juan Quintela <quintela@redhat.com> > --- > migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++- > 1 file changed, 52 insertions(+), 1 deletion(-) > > diff --git a/migration/ram.c b/migration/ram.c > index c71929e..9d7bc64 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void) > /* Multiple fd's */ > > struct MultiFDSendParams { > + /* not changed */ > QemuThread thread; > QIOChannel *c; > QemuCond cond; > QemuMutex mutex; > + /* protected by param mutex */ > bool quit; > bool started; > + uint8_t *address; > + /* protected by multifd mutex */ > + bool done; > }; > typedef struct MultiFDSendParams MultiFDSendParams; > > static MultiFDSendParams *multifd_send; > > +QemuMutex multifd_send_mutex; > +QemuCond multifd_send_cond; Having n+1 semaphores instead of n+1 cond/mutex pairs could be more efficient. See thread-pool.c for an example. Paolo > static void *multifd_send_thread(void *opaque) > { > MultiFDSendParams *params = opaque; > @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque) > > qemu_mutex_lock(¶ms->mutex); > while (!params->quit){ > - qemu_cond_wait(¶ms->cond, ¶ms->mutex); > + if (params->address) { > + params->address = 0; > + qemu_mutex_unlock(¶ms->mutex); > + qemu_mutex_lock(&multifd_send_mutex); > + params->done = true; > + qemu_cond_signal(&multifd_send_cond); > + qemu_mutex_unlock(&multifd_send_mutex); > + qemu_mutex_lock(¶ms->mutex); > + } else { > + qemu_cond_wait(¶ms->cond, ¶ms->mutex); > + } > } > qemu_mutex_unlock(¶ms->mutex); > > @@ -464,12 +482,16 @@ void migrate_multifd_send_threads_create(void) > } > thread_count = migrate_multifd_threads(); > multifd_send = g_new0(MultiFDSendParams, thread_count); > + qemu_mutex_init(&multifd_send_mutex); > + qemu_cond_init(&multifd_send_cond); > for (i = 0; i < thread_count; i++) { > char thread_name[15]; > qemu_mutex_init(&multifd_send[i].mutex); > qemu_cond_init(&multifd_send[i].cond); > multifd_send[i].quit = false; > multifd_send[i].started = false; > + multifd_send[i].done = true; > + multifd_send[i].address = 0; > multifd_send[i].c = socket_send_channel_create(); > if(!multifd_send[i].c) { > error_report("Error creating a send channel"); > @@ -487,6 +509,34 @@ void migrate_multifd_send_threads_create(void) > } > } > > +static int multifd_send_page(uint8_t *address) > +{ > + int i, thread_count; > + bool found = false; > + > + thread_count = migrate_multifd_threads(); > + qemu_mutex_lock(&multifd_send_mutex); > + while (!found) { > + for (i = 0; i < thread_count; i++) { > + if (multifd_send[i].done) { > + multifd_send[i].done = false; > + found = true; > + break; > + } > + } > + if (!found) { > + qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex); > + } > + } > + qemu_mutex_unlock(&multifd_send_mutex); > + qemu_mutex_lock(&multifd_send[i].mutex); > + multifd_send[i].address = address; > + qemu_cond_signal(&multifd_send[i].cond); > + qemu_mutex_unlock(&multifd_send[i].mutex); > + > + return 0; > +} > + > struct MultiFDRecvParams { > QemuThread thread; > QIOChannel *c; > @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss, > *bytes_transferred += > save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE); > qemu_put_buffer(f, p, TARGET_PAGE_SIZE); > + multifd_send_page(p); > *bytes_transferred += TARGET_PAGE_SIZE; > pages = 1; > acct_info.norm_pages++; >
* Juan Quintela (quintela@redhat.com) wrote: > We make the locking and the transfer of information specific, even if we > are still transmiting things through the main thread. > > Signed-off-by: Juan Quintela <quintela@redhat.com> > --- > migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++- > 1 file changed, 52 insertions(+), 1 deletion(-) > > diff --git a/migration/ram.c b/migration/ram.c > index c71929e..9d7bc64 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void) > /* Multiple fd's */ > > struct MultiFDSendParams { > + /* not changed */ > QemuThread thread; > QIOChannel *c; > QemuCond cond; > QemuMutex mutex; > + /* protected by param mutex */ > bool quit; > bool started; > + uint8_t *address; > + /* protected by multifd mutex */ > + bool done; > }; > typedef struct MultiFDSendParams MultiFDSendParams; > > static MultiFDSendParams *multifd_send; > > +QemuMutex multifd_send_mutex; > +QemuCond multifd_send_cond; > + > static void *multifd_send_thread(void *opaque) > { > MultiFDSendParams *params = opaque; > @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque) > > qemu_mutex_lock(¶ms->mutex); > while (!params->quit){ > - qemu_cond_wait(¶ms->cond, ¶ms->mutex); > + if (params->address) { > + params->address = 0; This confused me (I wondered what happens to the 1st block) but I see in the next patch this gets replaced by something more complex; so I suggest just using params->dummy and commented it's about to get replaced. > + qemu_mutex_unlock(¶ms->mutex); > + qemu_mutex_lock(&multifd_send_mutex); > + params->done = true; > + qemu_cond_signal(&multifd_send_cond); > + qemu_mutex_unlock(&multifd_send_mutex); > + qemu_mutex_lock(¶ms->mutex); > + } else { > + qemu_cond_wait(¶ms->cond, ¶ms->mutex); > + } > } > qemu_mutex_unlock(¶ms->mutex); > > @@ -464,12 +482,16 @@ void migrate_multifd_send_threads_create(void) > } > thread_count = migrate_multifd_threads(); > multifd_send = g_new0(MultiFDSendParams, thread_count); > + qemu_mutex_init(&multifd_send_mutex); > + qemu_cond_init(&multifd_send_cond); > for (i = 0; i < thread_count; i++) { > char thread_name[15]; > qemu_mutex_init(&multifd_send[i].mutex); > qemu_cond_init(&multifd_send[i].cond); > multifd_send[i].quit = false; > multifd_send[i].started = false; > + multifd_send[i].done = true; > + multifd_send[i].address = 0; > multifd_send[i].c = socket_send_channel_create(); > if(!multifd_send[i].c) { > error_report("Error creating a send channel"); > @@ -487,6 +509,34 @@ void migrate_multifd_send_threads_create(void) > } > } > > +static int multifd_send_page(uint8_t *address) > +{ > + int i, thread_count; > + bool found = false; > + > + thread_count = migrate_multifd_threads(); > + qemu_mutex_lock(&multifd_send_mutex); > + while (!found) { > + for (i = 0; i < thread_count; i++) { > + if (multifd_send[i].done) { > + multifd_send[i].done = false; > + found = true; > + break; > + } > + } > + if (!found) { > + qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex); > + } > + } > + qemu_mutex_unlock(&multifd_send_mutex); > + qemu_mutex_lock(&multifd_send[i].mutex); Having a 'multifd_send_mutex' and a 'multifd_send[i].mutex' is pretty confusing! > + multifd_send[i].address = address; > + qemu_cond_signal(&multifd_send[i].cond); > + qemu_mutex_unlock(&multifd_send[i].mutex); > + > + return 0; > +} > + > struct MultiFDRecvParams { > QemuThread thread; > QIOChannel *c; > @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss, > *bytes_transferred += > save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE); > qemu_put_buffer(f, p, TARGET_PAGE_SIZE); > + multifd_send_page(p); > *bytes_transferred += TARGET_PAGE_SIZE; > pages = 1; > acct_info.norm_pages++; > -- > 2.9.3 I think I'm pretty OK with this; but we'll see what it looks like after you think about Paolo's suggestion; it does feel like it should be possible to do the locking etc simpler; I just don't know how. Dave -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
Paolo Bonzini <pbonzini@redhat.com> wrote: > On 23/01/2017 22:32, Juan Quintela wrote: >> We make the locking and the transfer of information specific, even if we >> are still transmiting things through the main thread. >> >> Signed-off-by: Juan Quintela <quintela@redhat.com> >> --- >> migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++- >> 1 file changed, 52 insertions(+), 1 deletion(-) >> >> diff --git a/migration/ram.c b/migration/ram.c >> index c71929e..9d7bc64 100644 >> --- a/migration/ram.c >> +++ b/migration/ram.c >> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void) >> /* Multiple fd's */ >> >> struct MultiFDSendParams { >> + /* not changed */ >> QemuThread thread; >> QIOChannel *c; >> QemuCond cond; >> QemuMutex mutex; >> + /* protected by param mutex */ >> bool quit; >> bool started; >> + uint8_t *address; >> + /* protected by multifd mutex */ >> + bool done; >> }; >> typedef struct MultiFDSendParams MultiFDSendParams; >> >> static MultiFDSendParams *multifd_send; >> >> +QemuMutex multifd_send_mutex; >> +QemuCond multifd_send_cond; > > Having n+1 semaphores instead of n+1 cond/mutex pairs could be more > efficient. See thread-pool.c for an example. Did that. See next version. Only partial success. It goes faster, and code is somehow easier. But on reception, I end having to add 3 sems for thread (ok, I could move to only two reusing them, but indeed). On send side, I got speedups, on reception side no, but I haven't still found the cause. Thanks, Juan.
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote: > * Juan Quintela (quintela@redhat.com) wrote: >> We make the locking and the transfer of information specific, even if we >> are still transmiting things through the main thread. >> >> Signed-off-by: Juan Quintela <quintela@redhat.com> >> --- >> migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++- >> 1 file changed, 52 insertions(+), 1 deletion(-) >> >> diff --git a/migration/ram.c b/migration/ram.c >> index c71929e..9d7bc64 100644 >> --- a/migration/ram.c >> +++ b/migration/ram.c >> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void) >> /* Multiple fd's */ >> >> struct MultiFDSendParams { >> + /* not changed */ >> QemuThread thread; >> QIOChannel *c; >> QemuCond cond; >> QemuMutex mutex; >> + /* protected by param mutex */ >> bool quit; >> bool started; >> + uint8_t *address; >> + /* protected by multifd mutex */ >> + bool done; >> }; >> typedef struct MultiFDSendParams MultiFDSendParams; >> >> static MultiFDSendParams *multifd_send; >> >> +QemuMutex multifd_send_mutex; >> +QemuCond multifd_send_cond; >> + >> static void *multifd_send_thread(void *opaque) >> { >> MultiFDSendParams *params = opaque; >> @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque) >> >> qemu_mutex_lock(¶ms->mutex); >> while (!params->quit){ >> - qemu_cond_wait(¶ms->cond, ¶ms->mutex); >> + if (params->address) { >> + params->address = 0; > > This confused me (I wondered what happens to the 1st block) but > I see in the next patch this gets replaced by something more complex; > so I suggest just using params->dummy and commented it's about > to get replaced. if you preffer, I wanted to minimize the change on the next patch, otherwise I also have to change the places where I check the value of address. >> + qemu_mutex_unlock(&multifd_send_mutex); >> + qemu_mutex_lock(&multifd_send[i].mutex); > > Having a 'multifd_send_mutex' and a > 'multifd_send[i].mutex' > is pretty confusing! For different reason, I have moved all the multifd_send[i]. to "p->" Better? > >> + multifd_send[i].address = address; >> + qemu_cond_signal(&multifd_send[i].cond); >> + qemu_mutex_unlock(&multifd_send[i].mutex); >> + >> + return 0; >> +} >> + >> struct MultiFDRecvParams { >> QemuThread thread; >> QIOChannel *c; >> @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss, >> *bytes_transferred += >> save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE); >> qemu_put_buffer(f, p, TARGET_PAGE_SIZE); >> + multifd_send_page(p); >> *bytes_transferred += TARGET_PAGE_SIZE; >> pages = 1; >> acct_info.norm_pages++; >> -- >> 2.9.3 > > I think I'm pretty OK with this; but we'll see what it looks like > after you think about Paolo's suggestion; it does feel like it should > be possible to do the locking etc simpler; I just don't know how. Locking can be simpler, but the problem is being speed :-( Paolo suggestion have helped. That our meassurement of bandwidth is lame, haven't :-( Later, Juan.
* Juan Quintela (quintela@redhat.com) wrote: > "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote: > > * Juan Quintela (quintela@redhat.com) wrote: > >> We make the locking and the transfer of information specific, even if we > >> are still transmiting things through the main thread. > >> > >> Signed-off-by: Juan Quintela <quintela@redhat.com> > >> --- > >> migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++- > >> 1 file changed, 52 insertions(+), 1 deletion(-) > >> > >> diff --git a/migration/ram.c b/migration/ram.c > >> index c71929e..9d7bc64 100644 > >> --- a/migration/ram.c > >> +++ b/migration/ram.c > >> @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void) > >> /* Multiple fd's */ > >> > >> struct MultiFDSendParams { > >> + /* not changed */ > >> QemuThread thread; > >> QIOChannel *c; > >> QemuCond cond; > >> QemuMutex mutex; > >> + /* protected by param mutex */ > >> bool quit; > >> bool started; > >> + uint8_t *address; > >> + /* protected by multifd mutex */ > >> + bool done; > >> }; > >> typedef struct MultiFDSendParams MultiFDSendParams; > >> > >> static MultiFDSendParams *multifd_send; > >> > >> +QemuMutex multifd_send_mutex; > >> +QemuCond multifd_send_cond; > >> + > >> static void *multifd_send_thread(void *opaque) > >> { > >> MultiFDSendParams *params = opaque; > >> @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque) > >> > >> qemu_mutex_lock(¶ms->mutex); > >> while (!params->quit){ > >> - qemu_cond_wait(¶ms->cond, ¶ms->mutex); > >> + if (params->address) { > >> + params->address = 0; > > > > This confused me (I wondered what happens to the 1st block) but > > I see in the next patch this gets replaced by something more complex; > > so I suggest just using params->dummy and commented it's about > > to get replaced. > > if you preffer, I wanted to minimize the change on the next patch, > otherwise I also have to change the places where I check the value of > address. > OK, perhaps just adding a comment to say it's going to go in the next patch would work. > >> + qemu_mutex_unlock(&multifd_send_mutex); > >> + qemu_mutex_lock(&multifd_send[i].mutex); > > > > Having a 'multifd_send_mutex' and a > > 'multifd_send[i].mutex' > > is pretty confusing! > > For different reason, I have moved all the > > multifd_send[i]. to "p->" > > Better? Maybe! > > > >> + multifd_send[i].address = address; > >> + qemu_cond_signal(&multifd_send[i].cond); > >> + qemu_mutex_unlock(&multifd_send[i].mutex); > >> + > >> + return 0; > >> +} > >> + > >> struct MultiFDRecvParams { > >> QemuThread thread; > >> QIOChannel *c; > >> @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss, > >> *bytes_transferred += > >> save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE); > >> qemu_put_buffer(f, p, TARGET_PAGE_SIZE); > >> + multifd_send_page(p); > >> *bytes_transferred += TARGET_PAGE_SIZE; > >> pages = 1; > >> acct_info.norm_pages++; > >> -- > >> 2.9.3 > > > > I think I'm pretty OK with this; but we'll see what it looks like > > after you think about Paolo's suggestion; it does feel like it should > > be possible to do the locking etc simpler; I just don't know how. > > Locking can be simpler, but the problem is being speed :-( > Paolo suggestion have helped. > That our meassurement of bandwidth is lame, haven't :-( Are you sure that your performance problems are anything to do with locking? Dave > Later, Juan. -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff --git a/migration/ram.c b/migration/ram.c index c71929e..9d7bc64 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -392,17 +392,25 @@ void migrate_compress_threads_create(void) /* Multiple fd's */ struct MultiFDSendParams { + /* not changed */ QemuThread thread; QIOChannel *c; QemuCond cond; QemuMutex mutex; + /* protected by param mutex */ bool quit; bool started; + uint8_t *address; + /* protected by multifd mutex */ + bool done; }; typedef struct MultiFDSendParams MultiFDSendParams; static MultiFDSendParams *multifd_send; +QemuMutex multifd_send_mutex; +QemuCond multifd_send_cond; + static void *multifd_send_thread(void *opaque) { MultiFDSendParams *params = opaque; @@ -416,7 +424,17 @@ static void *multifd_send_thread(void *opaque) qemu_mutex_lock(¶ms->mutex); while (!params->quit){ - qemu_cond_wait(¶ms->cond, ¶ms->mutex); + if (params->address) { + params->address = 0; + qemu_mutex_unlock(¶ms->mutex); + qemu_mutex_lock(&multifd_send_mutex); + params->done = true; + qemu_cond_signal(&multifd_send_cond); + qemu_mutex_unlock(&multifd_send_mutex); + qemu_mutex_lock(¶ms->mutex); + } else { + qemu_cond_wait(¶ms->cond, ¶ms->mutex); + } } qemu_mutex_unlock(¶ms->mutex); @@ -464,12 +482,16 @@ void migrate_multifd_send_threads_create(void) } thread_count = migrate_multifd_threads(); multifd_send = g_new0(MultiFDSendParams, thread_count); + qemu_mutex_init(&multifd_send_mutex); + qemu_cond_init(&multifd_send_cond); for (i = 0; i < thread_count; i++) { char thread_name[15]; qemu_mutex_init(&multifd_send[i].mutex); qemu_cond_init(&multifd_send[i].cond); multifd_send[i].quit = false; multifd_send[i].started = false; + multifd_send[i].done = true; + multifd_send[i].address = 0; multifd_send[i].c = socket_send_channel_create(); if(!multifd_send[i].c) { error_report("Error creating a send channel"); @@ -487,6 +509,34 @@ void migrate_multifd_send_threads_create(void) } } +static int multifd_send_page(uint8_t *address) +{ + int i, thread_count; + bool found = false; + + thread_count = migrate_multifd_threads(); + qemu_mutex_lock(&multifd_send_mutex); + while (!found) { + for (i = 0; i < thread_count; i++) { + if (multifd_send[i].done) { + multifd_send[i].done = false; + found = true; + break; + } + } + if (!found) { + qemu_cond_wait(&multifd_send_cond, &multifd_send_mutex); + } + } + qemu_mutex_unlock(&multifd_send_mutex); + qemu_mutex_lock(&multifd_send[i].mutex); + multifd_send[i].address = address; + qemu_cond_signal(&multifd_send[i].cond); + qemu_mutex_unlock(&multifd_send[i].mutex); + + return 0; +} + struct MultiFDRecvParams { QemuThread thread; QIOChannel *c; @@ -1015,6 +1065,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss, *bytes_transferred += save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE); qemu_put_buffer(f, p, TARGET_PAGE_SIZE); + multifd_send_page(p); *bytes_transferred += TARGET_PAGE_SIZE; pages = 1; acct_info.norm_pages++;
We make the locking and the transfer of information specific, even if we are still transmiting things through the main thread. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/ram.c | 53 ++++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-)