@@ -527,7 +527,8 @@ void migrate_multifd_send_threads_create(void)
}
}
-static int multifd_send_page(uint8_t *address)
+
+static uint16_t multifd_send_page(uint8_t *address, bool last_page)
{
int i, j, thread_count;
static MultiFDPages pages;
@@ -541,8 +542,10 @@ static int multifd_send_page(uint8_t *address)
pages.address[pages.num] = address;
pages.num++;
- if (pages.num < (pages.size - 1)) {
- return UINT16_MAX;
+ if (!last_page) {
+ if (pages.num < (pages.size - 1)) {
+ return UINT16_MAX;
+ }
}
thread_count = migrate_multifd_threads();
@@ -564,16 +567,21 @@ static int multifd_send_page(uint8_t *address)
qemu_mutex_unlock(&multifd_send[i].mutex);
qemu_sem_post(&multifd_send[i].sem);
- return 0;
+ return i;
}
struct MultiFDRecvParams {
+ /* not changed */
QemuThread thread;
QIOChannel *c;
QemuSemaphore init;
+ QemuSemaphore ready;
QemuSemaphore sem;
QemuMutex mutex;
+ /* proteced by param mutex */
bool quit;
+ MultiFDPages pages;
+ bool done;
};
typedef struct MultiFDRecvParams MultiFDRecvParams;
@@ -586,6 +594,7 @@ static void *multifd_recv_thread(void *opaque)
qio_channel_read(params->c, &start, 1, &error_abort);
qemu_sem_post(¶ms->init);
+ qemu_sem_post(¶ms->ready);
while (true) {
qemu_mutex_lock(¶ms->mutex);
@@ -593,6 +602,13 @@ static void *multifd_recv_thread(void *opaque)
qemu_mutex_unlock(¶ms->mutex);
break;
}
+ if (params->pages.num) {
+ params->pages.num = 0;
+ params->done = true;
+ qemu_mutex_unlock(¶ms->mutex);
+ qemu_sem_post(¶ms->ready);
+ continue;
+ }
qemu_mutex_unlock(¶ms->mutex);
qemu_sem_wait(¶ms->sem);
}
@@ -652,7 +668,10 @@ void migrate_multifd_recv_threads_create(void)
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
qemu_sem_init(&p->init, 0);
+ qemu_sem_init(&p->ready, 0);
p->quit = false;
+ p->done = false;
+ multifd_init_group(&p->pages);
p->c = socket_recv_channel_create();
if (!p->c) {
@@ -666,6 +685,42 @@ void migrate_multifd_recv_threads_create(void)
socket_recv_channel_close_listening();
}
+static void multifd_recv_page(uint8_t *address, uint16_t fd_num)
+{
+ int i, thread_count;
+ MultiFDRecvParams *params;
+ static MultiFDPages pages;
+ static bool once;
+
+ if (!once) {
+ multifd_init_group(&pages);
+ once = true;
+ }
+
+ pages.address[pages.num] = address;
+ pages.num++;
+
+ if (fd_num == UINT16_MAX) {
+ return;
+ }
+
+ thread_count = migrate_multifd_threads();
+ assert(fd_num < thread_count);
+ params = &multifd_recv[fd_num];
+
+ qemu_sem_wait(¶ms->ready);
+
+ qemu_mutex_lock(¶ms->mutex);
+ params->done = false;
+ for (i = 0; i < pages.num; i++) {
+ params->pages.address[i] = pages.address[i];
+ }
+ params->pages.num = pages.num;
+ pages.num = 0;
+ qemu_mutex_unlock(¶ms->mutex);
+ qemu_sem_post(¶ms->sem);
+}
+
/**
* save_page_header: Write page header to wire
*
@@ -1085,6 +1140,7 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
bool last_stage, uint64_t *bytes_transferred)
{
int pages;
+ uint16_t fd_num;
uint8_t *p;
RAMBlock *block = pss->block;
ram_addr_t offset = pss->offset;
@@ -1098,8 +1154,10 @@ static int ram_multifd_page(QEMUFile *f, PageSearchStatus *pss,
if (pages == -1) {
*bytes_transferred +=
save_page_header(f, block, offset | RAM_SAVE_FLAG_MULTIFD_PAGE);
+ fd_num = multifd_send_page(p, migration_dirty_pages == 1);
+ qemu_put_be16(f, fd_num);
+ *bytes_transferred += 2; /* size of fd_num */
qemu_put_buffer(f, p, TARGET_PAGE_SIZE);
- multifd_send_page(p);
*bytes_transferred += TARGET_PAGE_SIZE;
pages = 1;
acct_info.norm_pages++;
@@ -2813,6 +2871,7 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
while (!postcopy_running && !ret && !(flags & RAM_SAVE_FLAG_EOS)) {
ram_addr_t addr, total_ram_bytes;
void *host = NULL;
+ uint16_t fd_num;
uint8_t ch;
addr = qemu_get_be64(f);
@@ -2910,6 +2969,8 @@ static int ram_load(QEMUFile *f, void *opaque, int version_id)
break;
case RAM_SAVE_FLAG_MULTIFD_PAGE:
+ fd_num = qemu_get_be16(f);
+ multifd_recv_page(host, fd_num);
qemu_get_buffer(f, host, TARGET_PAGE_SIZE);
break;