@@ -248,6 +248,19 @@ struct {
int exiting;
} *multifd_send_state;
+int get_multifd_send_param(int id, MultiFDSendParams **param)
+{
+ int ret = 0;
+
+ if (id < 0 || id >= migrate_multifd_channels()) {
+ ret = -1;
+ } else {
+ *param = &(multifd_send_state->params[id]);
+ }
+
+ return ret;
+}
+
/*
* How we use multifd_send_state->pages and channel->pages?
*
@@ -410,6 +423,9 @@ void multifd_save_cleanup(void)
p->packet_len = 0;
g_free(p->packet);
p->packet = NULL;
+ if (migrate_use_rdma()) {
+ g_free(p->rdma);
+ }
}
qemu_sem_destroy(&multifd_send_state->channels_ready);
g_free(multifd_send_state->params);
@@ -464,6 +480,27 @@ void multifd_send_sync_main(QEMUFile *f)
trace_multifd_send_sync_main(multifd_send_state->packet_num);
}
+static void *multifd_rdma_send_thread(void *opaque)
+{
+ MultiFDSendParams *p = opaque;
+
+ while (true) {
+ qemu_mutex_lock(&p->mutex);
+ if (p->quit) {
+ qemu_mutex_unlock(&p->mutex);
+ break;
+ }
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_wait(&p->sem);
+ }
+
+ qemu_mutex_lock(&p->mutex);
+ p->running = false;
+ qemu_mutex_unlock(&p->mutex);
+
+ return NULL;
+}
+
static void *multifd_send_thread(void *opaque)
{
MultiFDSendParams *p = opaque;
@@ -566,6 +603,12 @@ static void rdma_send_channel_create(MultiFDSendParams *p)
{
Error *local_err = NULL;
+ if (multifd_channel_rdma_connect(p)) {
+ error_setg(&local_err, "multifd: rdma channel %d not established",
+ p->id);
+ return ;
+ }
+
if (p->quit) {
error_setg(&local_err, "multifd: send id %d already quit", p->id);
return ;
@@ -654,6 +697,19 @@ struct {
uint64_t packet_num;
} *multifd_recv_state;
+int get_multifd_recv_param(int id, MultiFDRecvParams **param)
+{
+ int ret = 0;
+
+ if (id < 0 || id >= migrate_multifd_channels()) {
+ ret = -1;
+ } else {
+ *param = &(multifd_recv_state->params[id]);
+ }
+
+ return ret;
+}
+
static void multifd_recv_terminate_threads(Error *err)
{
int i;
@@ -724,6 +780,9 @@ int multifd_load_cleanup(Error **errp)
p->packet_len = 0;
g_free(p->packet);
p->packet = NULL;
+ if (migrate_use_rdma()) {
+ g_free(p->rdma);
+ }
}
qemu_sem_destroy(&multifd_recv_state->sem_sync);
g_free(multifd_recv_state->params);
@@ -761,6 +820,27 @@ void multifd_recv_sync_main(void)
trace_multifd_recv_sync_main(multifd_recv_state->packet_num);
}
+static void *multifd_rdma_recv_thread(void *opaque)
+{
+ MultiFDRecvParams *p = opaque;
+
+ while (true) {
+ qemu_mutex_lock(&p->mutex);
+ if (p->quit) {
+ qemu_mutex_unlock(&p->mutex);
+ break;
+ }
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_wait(&p->sem_sync);
+ }
+
+ qemu_mutex_lock(&p->mutex);
+ p->running = false;
+ qemu_mutex_unlock(&p->mutex);
+
+ return NULL;
+}
+
static void *multifd_recv_thread(void *opaque)
{
MultiFDRecvParams *p = opaque;
@@ -880,18 +960,24 @@ bool multifd_recv_all_channels_created(void)
bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
{
MultiFDRecvParams *p;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
Error *local_err = NULL;
int id;
- id = multifd_recv_initial_packet(ioc, &local_err);
- if (id < 0) {
- multifd_recv_terminate_threads(local_err);
- error_propagate_prepend(errp, local_err,
- "failed to receive packet"
- " via multifd channel %d: ",
- atomic_read(&multifd_recv_state->count));
- return false;
+ if (migrate_use_rdma()) {
+ id = multifd_recv_state->count;
+ } else {
+ id = multifd_recv_initial_packet(ioc, &local_err);
+ if (id < 0) {
+ multifd_recv_terminate_threads(local_err);
+ error_propagate_prepend(errp, local_err,
+ "failed to receive packet"
+ " via multifd channel %d: ",
+ atomic_read(&multifd_recv_state->count));
+ return false;
+ }
}
+
trace_multifd_recv_new_channel(id);
p = &multifd_recv_state->params[id];
@@ -903,6 +989,7 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
return false;
}
p->c = ioc;
+ p->file = rioc->file;
object_ref(OBJECT(ioc));
/* initial packet */
p->num_packets = 1;
@@ -67,6 +67,10 @@ typedef struct {
char *name;
/* channel thread id */
QemuThread thread;
+ /* RDMAContext channel */
+ RDMAContext *rdma;
+ /* communication channel */
+ QEMUFile *file;
/* communication channel */
QIOChannel *c;
/* sem where to wait for more work */
@@ -108,6 +112,10 @@ typedef struct {
char *name;
/* channel thread id */
QemuThread thread;
+ /* RDMAContext channel */
+ RDMAContext *rdma;
+ /* communication channel */
+ QEMUFile *file;
/* communication channel */
QIOChannel *c;
/* this mutex protects the following parameters */
@@ -137,5 +145,7 @@ typedef struct {
QemuSemaphore sem_sync;
} MultiFDRecvParams;
+int get_multifd_send_param(int id, MultiFDSendParams **param);
+int get_multifd_recv_param(int id, MultiFDRecvParams **param);
#endif
@@ -94,6 +94,8 @@ static const char *wrid_desc[] = {
[RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
};
+static const char *rdma_host_port;
+
/*
* Negotiate RDMA capabilities during connection-setup time.
*/
@@ -3122,6 +3124,33 @@ static int qemu_rdma_accept(RDMAContext *rdma)
qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
NULL,
(void *)(intptr_t)rdma->return_path);
+ } else if (migrate_use_multifd()) {
+ int thread_count;
+ int i;
+ MultiFDRecvParams *multifd_recv_param;
+ RDMAContext *multifd_rdma = NULL;
+ thread_count = migrate_multifd_channels();
+ /* create the multifd channels for RDMA */
+ for (i = 0; i < thread_count; i++) {
+ if (get_multifd_recv_param(i, &multifd_recv_param) < 0) {
+ error_report("rdma: error getting multifd_recv_param(%d)", i);
+ goto err_rdma_dest_wait;
+ }
+
+ if (multifd_recv_param->rdma->cm_id == NULL) {
+ multifd_rdma = multifd_recv_param->rdma;
+ break;
+ }
+ }
+
+ if (multifd_rdma) {
+ qemu_set_fd_handler(rdma->channel->fd,
+ rdma_accept_incoming_migration,
+ NULL, (void *)(intptr_t)multifd_rdma);
+ } else {
+ qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
+ NULL, rdma);
+ }
} else {
qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
NULL, rdma);
@@ -3744,7 +3773,7 @@ static void migration_rdma_process_incoming(QEMUFile *f, Error **errp)
mis->from_src_file = f;
qemu_file_set_blocking(f, false);
- start_migration = migrate_use_multifd();
+ start_migration = !migrate_use_multifd();
} else {
ioc = QIO_CHANNEL(getQIOChannel(f));
/* Multiple connections */
@@ -3847,6 +3876,30 @@ void rdma_start_incoming_migration(const char *host_port, Error **errp)
goto err;
}
+ if (migrate_use_multifd()) {
+ int thread_count;
+ int i;
+ int idx;
+ MultiFDRecvParams *multifd_recv_param;
+ thread_count = migrate_multifd_channels();
+ for (i = 0; i < thread_count; i++) {
+ if (get_multifd_recv_param(i, &multifd_recv_param) < 0) {
+ error_report("rdma: error getting multifd_recv_param(%d)", i);
+ goto err;
+ }
+
+ multifd_recv_param->rdma = qemu_rdma_data_init(host_port,
+ &local_err);
+ for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
+ multifd_recv_param->rdma->wr_data[idx].control_len = 0;
+ multifd_recv_param->rdma->wr_data[idx].control_curr = NULL;
+ }
+ /* the CM channel and CM id is shared */
+ multifd_recv_param->rdma->channel = rdma->channel;
+ multifd_recv_param->rdma->listen_id = rdma->listen_id;
+ }
+ }
+
qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
NULL, (void *)(intptr_t)rdma);
return;
@@ -3868,6 +3921,10 @@ void rdma_start_outgoing_migration(void *opaque,
goto err;
}
+ if (migrate_use_multifd()) {
+ rdma_host_port = g_strdup(host_port);
+ }
+
ret = qemu_rdma_source_init(rdma,
s->enabled_capabilities[MIGRATION_CAPABILITY_RDMA_PIN_ALL], errp);
@@ -3918,44 +3975,38 @@ err:
g_free(rdma_return_path);
}
-void *multifd_rdma_recv_thread(void *opaque)
+int multifd_channel_rdma_connect(void *opaque)
{
- MultiFDRecvParams *p = opaque;
+ MultiFDSendParams *p = opaque;
+ Error *local_err = NULL;
+ int ret = 0;
- while (true) {
- qemu_mutex_lock(&p->mutex);
- if (p->quit) {
- qemu_mutex_unlock(&p->mutex);
- break;
- }
- qemu_mutex_unlock(&p->mutex);
- qemu_sem_wait(&p->sem_sync);
+ p->rdma = qemu_rdma_data_init(rdma_host_port, &local_err);
+ if (p->rdma == NULL) {
+ goto out;
}
- qemu_mutex_lock(&p->mutex);
- p->running = false;
- qemu_mutex_unlock(&p->mutex);
-
- return NULL;
-}
+ ret = qemu_rdma_source_init(p->rdma,
+ migrate_use_rdma_pin_all(),
+ &local_err);
+ if (ret) {
+ goto out;
+ }
-void *multifd_rdma_send_thread(void *opaque)
-{
- MultiFDSendParams *p = opaque;
+ ret = qemu_rdma_connect(p->rdma, &local_err);
+ if (ret) {
+ goto out;
+ }
- while (true) {
- qemu_mutex_lock(&p->mutex);
- if (p->quit) {
- qemu_mutex_unlock(&p->mutex);
- break;
- }
- qemu_mutex_unlock(&p->mutex);
- qemu_sem_wait(&p->sem);
+ p->file = qemu_fopen_rdma(p->rdma, "wb");
+ if (p->file == NULL) {
+ goto out;
}
- qemu_mutex_lock(&p->mutex);
- p->running = false;
- qemu_mutex_unlock(&p->mutex);
+out:
+ if (local_err) {
+ trace_multifd_send_error(p->id);
+ }
- return NULL;
+ return ret;
}
@@ -263,9 +263,7 @@ struct QIOChannelRDMA {
bool blocking; /* XXX we don't actually honour this yet */
};
-
-void *multifd_rdma_recv_thread(void *opaque);
-void *multifd_rdma_send_thread(void *opaque);
+int multifd_channel_rdma_connect(void *opaque);
void rdma_start_outgoing_migration(void *opaque, const char *host_port,
Error **errp);
In both sides. We still don't transmit anything through them, and we only build the RDMA connections. Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com> --- migration/multifd.c | 103 ++++++++++++++++++++++++++++++++++++--- migration/multifd.h | 10 ++++ migration/rdma.c | 115 ++++++++++++++++++++++++++++++++------------ migration/rdma.h | 4 +- 4 files changed, 189 insertions(+), 43 deletions(-)