@@ -519,12 +519,27 @@ static void *multifd_rdma_send_thread(void *opaque)
}
qemu_mutex_unlock(&p->mutex);
+ /* To complete polling(CQE) */
+ while (p->rdma->nb_sent) {
+ ret = qemu_rdma_block_for_wrid(p->rdma, RDMA_WRID_RDMA_WRITE, NULL);
+ if (ret < 0) {
+ error_report("multifd RDMA migration: "
+ "complete polling error!");
+ return NULL;
+ }
+ }
+
/* Send FINISHED to the destination */
head.type = RDMA_CONTROL_REGISTER_FINISHED;
ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL);
if (ret < 0) {
+ error_report("multifd RDMA migration: "
+ "receiving remote info!");
return NULL;
}
+
+ /* sync main thread */
+ qemu_sem_post(&p->sem);
}
out:
@@ -96,6 +96,23 @@ static const char *wrid_desc[] = {
static const char *rdma_host_port;
+/*
+ * index of current RDMA channel for multifd
+ */
+static int current_RDMA_index;
+
+/*
+ * Get the multifd RDMA channel used to send data.
+ */
+static int get_multifd_RDMA_channel(void)
+{
+ int thread_count = migrate_multifd_channels();
+ current_RDMA_index++;
+ current_RDMA_index %= thread_count;
+
+ return current_RDMA_index;
+}
+
/*
* Negotiate RDMA capabilities during connection-setup time.
*/
@@ -1328,8 +1345,8 @@ static int qemu_rdma_wait_comp_channel(RDMAContext *rdma)
* completions only need to be recorded, but do not actually
* need further processing.
*/
-static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
- uint32_t *byte_len)
+int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
+ uint32_t *byte_len)
{
int num_cq_events = 0, ret = 0;
struct ibv_cq *cq;
@@ -1731,6 +1748,20 @@ static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
.repeat = 1,
};
+ /* use multifd to send data */
+ if (migrate_use_multifd() && migrate_use_rdma_pin_all()) {
+ int channel = get_multifd_RDMA_channel();
+ int ret = 0;
+ MultiFDSendParams *multifd_send_param = NULL;
+ ret = get_multifd_send_param(channel, &multifd_send_param);
+ if (ret) {
+ error_report("rdma: error getting multifd_send_param(%d)", channel);
+ return -EINVAL;
+ }
+ rdma = multifd_send_param->rdma;
+ block = &(rdma->local_ram_blocks.block[current_index]);
+ }
+
retry:
sge.addr = (uintptr_t)(block->local_host_addr +
(current_addr - block->offset));
@@ -1948,8 +1979,21 @@ static int qemu_rdma_write_flush(QEMUFile *f, RDMAContext *rdma)
}
if (ret == 0) {
- rdma->nb_sent++;
- trace_qemu_rdma_write_flush(rdma->nb_sent);
+ if (migrate_use_multifd() && migrate_use_rdma_pin_all()) {
+ /* The multifd RDMA threads send data */
+ MultiFDSendParams *multifd_send_param = NULL;
+ ret = get_multifd_send_param(current_RDMA_index,
+ &multifd_send_param);
+ if (ret) {
+ error_report("rdma: error getting multifd_send_param(%d)",
+ current_RDMA_index);
+ return ret;
+ }
+ multifd_send_param->rdma->nb_sent++;
+ } else {
+ rdma->nb_sent++;
+ trace_qemu_rdma_write_flush(rdma->nb_sent);
+ }
}
rdma->current_length = 0;
@@ -3758,7 +3802,10 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
ret = qemu_rdma_exchange_send(rdma, &head, NULL, NULL, NULL, NULL);
if (migrate_use_multifd()) {
- /* Inform src send_thread to send FINISHED signal */
+ /*
+ * Inform src send_thread to send FINISHED signal.
+ * Wait for multifd RDMA send threads to poll the CQE.
+ */
int i;
int thread_count = migrate_multifd_channels();
MultiFDSendParams *multifd_send_param = NULL;
@@ -3770,6 +3817,7 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
}
qemu_sem_post(&multifd_send_param->sem_sync);
+ qemu_sem_wait(&multifd_send_param->sem);
}
}
@@ -281,6 +281,8 @@ int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
int *resp_idx,
int (*callback)(RDMAContext *rdma));
int qemu_rdma_registration(void *opaque);
+int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested,
+ uint32_t *byte_len);
void rdma_start_outgoing_migration(void *opaque, const char *host_port,
Error **errp);
Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com> --- migration/multifd.c | 15 ++++++++++++ migration/rdma.c | 58 +++++++++++++++++++++++++++++++++++++++++---- migration/rdma.h | 2 ++ 3 files changed, 70 insertions(+), 5 deletions(-)