Message ID | 1485207141-1941-15-git-send-email-quintela@redhat.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On 23/01/2017 22:32, Juan Quintela wrote: > bool started; > + multifd_pages_t pages; > + /* proteced by multifd mutex */ > + bool done; > }; > typedef struct MultiFDRecvParams MultiFDRecvParams; > > static MultiFDRecvParams *multifd_recv; > > +QemuMutex multifd_recv_mutex; > +QemuCond multifd_recv_cond; > + > static void *multifd_recv_thread(void *opaque) Same here (and it should also be wrapped in an abstract data type). Paolo
* Juan Quintela (quintela@redhat.com) wrote: > We make the locking and the transfer of information specific, even if we > are still receiving things through the main thread. > > Signed-off-by: Juan Quintela <quintela@redhat.com> > --- > migration/ram.c | 77 +++++++++++++++++++++++++++++++++++++++++++++++++-------- > 1 file changed, 67 insertions(+), 10 deletions(-) > > diff --git a/migration/ram.c b/migration/ram.c > index ca94704..4e530ea 100644 > --- a/migration/ram.c > +++ b/migration/ram.c > @@ -523,7 +523,7 @@ void migrate_multifd_send_threads_create(void) > } > } > > -static int multifd_send_page(uint8_t *address) > +static uint16_t multifd_send_page(uint8_t *address, bool last_page) > { > int i, j, thread_count; > bool found = false; > @@ -538,8 +538,10 @@ static int multifd_send_page(uint8_t *address) > pages.address[pages.num] = address; > pages.num++; > > - if (pages.num < (pages.size - 1)) { > - return UINT16_MAX; > + if (!last_page) { > + if (pages.num < (pages.size - 1)) { > + return UINT16_MAX; > + } > } This should be in the previous patch? (and the place that adds the last_page parameter below)? > > thread_count = migrate_multifd_threads(); > @@ -570,17 +572,25 @@ static int multifd_send_page(uint8_t *address) > } > > struct MultiFDRecvParams { > + /* not changed */ > QemuThread thread; > QIOChannel *c; > QemuCond cond; > QemuMutex mutex; > + /* proteced by param mutex */ > bool quit; > bool started; > + multifd_pages_t pages; > + /* proteced by multifd mutex */ > + bool done; > }; > typedef struct MultiFDRecvParams MultiFDRecvParams; > > static MultiFDRecvParams *multifd_recv; > > +QemuMutex multifd_recv_mutex; > +QemuCond multifd_recv_cond; > + > static void *multifd_recv_thread(void *opaque) > { > MultiFDRecvParams *params = opaque; > @@ -594,7 +604,17 @@ static void *multifd_recv_thread(void *opaque) > > qemu_mutex_lock(¶ms->mutex); > while (!params->quit){ > - qemu_cond_wait(¶ms->cond, ¶ms->mutex); > + if (params->pages.num) { > + params->pages.num = 0; > + qemu_mutex_unlock(¶ms->mutex); > + qemu_mutex_lock(&multifd_recv_mutex); > + params->done = true; > + qemu_cond_signal(&multifd_recv_cond); > + qemu_mutex_unlock(&multifd_recv_mutex); > + qemu_mutex_lock(¶ms->mutex); > + } else { > + qemu_cond_wait(¶ms->cond, ¶ms->mutex); > + } > } > qemu_mutex_unlock(¶ms->mutex); > > @@ -647,8 +667,9 @@ void migrate_multifd_recv_threads_create(void) > qemu_cond_init(&multifd_recv[i].cond); > multifd_recv[i].quit = false; > multifd_recv[i].started = false; > + multifd_recv[i].done = true; > + multifd_init_group(&multifd_recv[i].pages); > multifd_recv[i].c = socket_recv_channel_create(); > - > if(!multifd_recv[i].c) { > error_report("Error creating a recv channel"); > exit(0); > @@ -664,6 +685,45 @@ void migrate_multifd_recv_threads_create(void) > } > } > > +static void multifd_recv_page(uint8_t *address, uint16_t fd_num) > +{ > + int i, thread_count; > + MultiFDRecvParams *params; > + static multifd_pages_t pages; > + static bool once = false; > + > + if (!once) { > + multifd_init_group(&pages); > + once = true; > + } > + > + pages.address[pages.num] = address; > + pages.num++; > + > + if (fd_num == UINT16_MAX) { > + return; > + } > + > + thread_count = migrate_multifd_threads(); > + assert(fd_num < thread_count); > + params = &multifd_recv[fd_num]; > + > + qemu_mutex_lock(&multifd_recv_mutex); > + while (!params->done) { > + qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex); > + } > + params->done = false; > + qemu_mutex_unlock(&multifd_recv_mutex); > + qemu_mutex_lock(¶ms->mutex); > + for(i = 0; i < pages.num; i++) { > + params->pages.address[i] = pages.address[i]; > + } > + params->pages.num = pages.num; > + pages.num = 0; > + qemu_cond_signal(¶ms->cond); > + qemu_mutex_unlock(¶ms->mutex); > +} > + > /** > * save_page_header: Write page header to wire > * > @@ -1097,7 +1157,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss, > if (pages == -1) { > *bytes_transferred += > save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE); > - fd_num = multifd_send_page(p); > + fd_num = multifd_send_page(p, migration_dirty_pages == 1); > qemu_put_be16(f, fd_num); > *bytes_transferred += 2; /* size of fd_num */ > qemu_put_buffer(f, p, TARGET_PAGE_SIZE); > @@ -2920,10 +2980,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) > > case RAM_SAVE_FLAG_MULTIFD_PAGE: > fd_num = qemu_get_be16(f); > - if (fd_num != 0) { > - /* this is yet an unused variable, changed later */ > - fd_num = fd_num; > - } > + multifd_recv_page(host, fd_num); This is going to be quite tricky to fit into ram_load_postcopy in this form; somehow it's going to have to find addresses to use for place page and with anything with a page size != target page size it gets messy. Dave > qemu_get_buffer(f, host, TARGET_PAGE_SIZE); > break; > > -- > 2.9.3 > -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote: > * Juan Quintela (quintela@redhat.com) wrote: >> We make the locking and the transfer of information specific, even if we >> are still receiving things through the main thread. >> >> Signed-off-by: Juan Quintela <quintela@redhat.com> >> --- >> migration/ram.c | 77 +++++++++++++++++++++++++++++++++++++++++++++++++-------- >> 1 file changed, 67 insertions(+), 10 deletions(-) >> >> diff --git a/migration/ram.c b/migration/ram.c >> index ca94704..4e530ea 100644 >> --- a/migration/ram.c >> +++ b/migration/ram.c >> @@ -523,7 +523,7 @@ void migrate_multifd_send_threads_create(void) >> } >> } >> >> -static int multifd_send_page(uint8_t *address) >> +static uint16_t multifd_send_page(uint8_t *address, bool last_page) >> { >> int i, j, thread_count; >> bool found = false; >> @@ -538,8 +538,10 @@ static int multifd_send_page(uint8_t *address) >> pages.address[pages.num] = address; >> pages.num++; >> >> - if (pages.num < (pages.size - 1)) { >> - return UINT16_MAX; >> + if (!last_page) { >> + if (pages.num < (pages.size - 1)) { >> + return UINT16_MAX; >> + } >> } > > This should be in the previous patch? > (and the place that adds the last_page parameter below)? ok. >> @@ -2920,10 +2980,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) >> >> case RAM_SAVE_FLAG_MULTIFD_PAGE: >> fd_num = qemu_get_be16(f); >> - if (fd_num != 0) { >> - /* this is yet an unused variable, changed later */ >> - fd_num = fd_num; >> - } >> + multifd_recv_page(host, fd_num); > > This is going to be quite tricky to fit into ram_load_postcopy > in this form; somehow it's going to have to find addresses to use for place page > and with anything with a page size != target page size it gets messy. What do you have in mind? Later, Juan.
* Juan Quintela (quintela@redhat.com) wrote: > "Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote: > > * Juan Quintela (quintela@redhat.com) wrote: > >> We make the locking and the transfer of information specific, even if we > >> are still receiving things through the main thread. > >> > >> Signed-off-by: Juan Quintela <quintela@redhat.com> > >> --- > >> migration/ram.c | 77 +++++++++++++++++++++++++++++++++++++++++++++++++-------- > >> 1 file changed, 67 insertions(+), 10 deletions(-) > >> > >> diff --git a/migration/ram.c b/migration/ram.c > >> index ca94704..4e530ea 100644 > >> --- a/migration/ram.c > >> +++ b/migration/ram.c > >> @@ -523,7 +523,7 @@ void migrate_multifd_send_threads_create(void) > >> } > >> } > >> > >> -static int multifd_send_page(uint8_t *address) > >> +static uint16_t multifd_send_page(uint8_t *address, bool last_page) > >> { > >> int i, j, thread_count; > >> bool found = false; > >> @@ -538,8 +538,10 @@ static int multifd_send_page(uint8_t *address) > >> pages.address[pages.num] = address; > >> pages.num++; > >> > >> - if (pages.num < (pages.size - 1)) { > >> - return UINT16_MAX; > >> + if (!last_page) { > >> + if (pages.num < (pages.size - 1)) { > >> + return UINT16_MAX; > >> + } > >> } > > > > This should be in the previous patch? > > (and the place that adds the last_page parameter below)? > > ok. > > >> @@ -2920,10 +2980,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) > >> > >> case RAM_SAVE_FLAG_MULTIFD_PAGE: > >> fd_num = qemu_get_be16(f); > >> - if (fd_num != 0) { > >> - /* this is yet an unused variable, changed later */ > >> - fd_num = fd_num; > >> - } > >> + multifd_recv_page(host, fd_num); > > > > This is going to be quite tricky to fit into ram_load_postcopy > > in this form; somehow it's going to have to find addresses to use for place page > > and with anything with a page size != target page size it gets messy. > > What do you have in mind? The problem is that for postcopy we read the data into a temporary buffer and then call a system call to 'place' the page atomically in memory. At the moment there's a single temporary buffer; for x86 this is easy - read a page into buffer; place it. For Power/ARM or hugepages we read consecutive target-pages into the temporary buffer and at the end of the page place the whole host/huge page at once. If you're reading multiple pages in parallel then you're going to need to take care with multiple temporary buffers; having one hugepage/hostpage per fd would probably be the easiest way. A related thing to take care of is that when switching to postcopy mode we probably need to take care to sync all of the fds to make sure any outstanding RAM load has completed before we start doing any postcopy magic. Dave > Later, Juan. -- Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
diff --git a/migration/ram.c b/migration/ram.c index ca94704..4e530ea 100644 --- a/migration/ram.c +++ b/migration/ram.c @@ -523,7 +523,7 @@ void migrate_multifd_send_threads_create(void) } } -static int multifd_send_page(uint8_t *address) +static uint16_t multifd_send_page(uint8_t *address, bool last_page) { int i, j, thread_count; bool found = false; @@ -538,8 +538,10 @@ static int multifd_send_page(uint8_t *address) pages.address[pages.num] = address; pages.num++; - if (pages.num < (pages.size - 1)) { - return UINT16_MAX; + if (!last_page) { + if (pages.num < (pages.size - 1)) { + return UINT16_MAX; + } } thread_count = migrate_multifd_threads(); @@ -570,17 +572,25 @@ static int multifd_send_page(uint8_t *address) } struct MultiFDRecvParams { + /* not changed */ QemuThread thread; QIOChannel *c; QemuCond cond; QemuMutex mutex; + /* proteced by param mutex */ bool quit; bool started; + multifd_pages_t pages; + /* proteced by multifd mutex */ + bool done; }; typedef struct MultiFDRecvParams MultiFDRecvParams; static MultiFDRecvParams *multifd_recv; +QemuMutex multifd_recv_mutex; +QemuCond multifd_recv_cond; + static void *multifd_recv_thread(void *opaque) { MultiFDRecvParams *params = opaque; @@ -594,7 +604,17 @@ static void *multifd_recv_thread(void *opaque) qemu_mutex_lock(¶ms->mutex); while (!params->quit){ - qemu_cond_wait(¶ms->cond, ¶ms->mutex); + if (params->pages.num) { + params->pages.num = 0; + qemu_mutex_unlock(¶ms->mutex); + qemu_mutex_lock(&multifd_recv_mutex); + params->done = true; + qemu_cond_signal(&multifd_recv_cond); + qemu_mutex_unlock(&multifd_recv_mutex); + qemu_mutex_lock(¶ms->mutex); + } else { + qemu_cond_wait(¶ms->cond, ¶ms->mutex); + } } qemu_mutex_unlock(¶ms->mutex); @@ -647,8 +667,9 @@ void migrate_multifd_recv_threads_create(void) qemu_cond_init(&multifd_recv[i].cond); multifd_recv[i].quit = false; multifd_recv[i].started = false; + multifd_recv[i].done = true; + multifd_init_group(&multifd_recv[i].pages); multifd_recv[i].c = socket_recv_channel_create(); - if(!multifd_recv[i].c) { error_report("Error creating a recv channel"); exit(0); @@ -664,6 +685,45 @@ void migrate_multifd_recv_threads_create(void) } } +static void multifd_recv_page(uint8_t *address, uint16_t fd_num) +{ + int i, thread_count; + MultiFDRecvParams *params; + static multifd_pages_t pages; + static bool once = false; + + if (!once) { + multifd_init_group(&pages); + once = true; + } + + pages.address[pages.num] = address; + pages.num++; + + if (fd_num == UINT16_MAX) { + return; + } + + thread_count = migrate_multifd_threads(); + assert(fd_num < thread_count); + params = &multifd_recv[fd_num]; + + qemu_mutex_lock(&multifd_recv_mutex); + while (!params->done) { + qemu_cond_wait(&multifd_recv_cond, &multifd_recv_mutex); + } + params->done = false; + qemu_mutex_unlock(&multifd_recv_mutex); + qemu_mutex_lock(¶ms->mutex); + for(i = 0; i < pages.num; i++) { + params->pages.address[i] = pages.address[i]; + } + params->pages.num = pages.num; + pages.num = 0; + qemu_cond_signal(¶ms->cond); + qemu_mutex_unlock(¶ms->mutex); +} + /** * save_page_header: Write page header to wire * @@ -1097,7 +1157,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss, if (pages == -1) { *bytes_transferred += save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE); - fd_num = multifd_send_page(p); + fd_num = multifd_send_page(p, migration_dirty_pages == 1); qemu_put_be16(f, fd_num); *bytes_transferred += 2; /* size of fd_num */ qemu_put_buffer(f, p, TARGET_PAGE_SIZE); @@ -2920,10 +2980,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id) case RAM_SAVE_FLAG_MULTIFD_PAGE: fd_num = qemu_get_be16(f); - if (fd_num != 0) { - /* this is yet an unused variable, changed later */ - fd_num = fd_num; - } + multifd_recv_page(host, fd_num); qemu_get_buffer(f, host, TARGET_PAGE_SIZE); break;
We make the locking and the transfer of information specific, even if we are still receiving things through the main thread. Signed-off-by: Juan Quintela <quintela@redhat.com> --- migration/ram.c | 77 +++++++++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 67 insertions(+), 10 deletions(-)