Message ID | 20220609073305.142515-5-het.gala@nutanix.com (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
Series | Multiple interface support on top of Multi-FD | expand |
* Het Gala (het.gala@nutanix.com) wrote: > i) Dynamically decide appropriate source and destination ip pairs for the > corresponding multi-FD channel to be connected. > > ii) Removed the support for setting the number of multi-fd channels from qmp > commands. As now all multiFD parameters will be passed via qmp: migrate > command or incoming flag itself. We can't do that, because it's part of the API already; what you'll need to do is check that the number of entries in your list corresponds to the value set there and error if it's different. Dave > Suggested-by: Manish Mishra <manish.mishra@nutanix.com> > Signed-off-by: Het Gala <het.gala@nutanix.com> > --- > migration/migration.c | 15 --------------- > migration/migration.h | 1 - > migration/multifd.c | 42 +++++++++++++++++++++--------------------- > migration/socket.c | 42 +++++++++++++++++++++++++++++++++--------- > migration/socket.h | 4 +++- > monitor/hmp-cmds.c | 4 ---- > qapi/migration.json | 6 ------ > 7 files changed, 57 insertions(+), 57 deletions(-) > > diff --git a/migration/migration.c b/migration/migration.c > index 9b0ad732e7..57dd4494b4 100644 > --- a/migration/migration.c > +++ b/migration/migration.c > @@ -1585,9 +1585,6 @@ static void migrate_params_test_apply(MigrateSetParameters *params, > if (params->has_block_incremental) { > dest->block_incremental = params->block_incremental; > } > - if (params->has_multifd_channels) { > - dest->multifd_channels = params->multifd_channels; > - } > if (params->has_multifd_compression) { > dest->multifd_compression = params->multifd_compression; > } > @@ -1702,9 +1699,6 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp) > if (params->has_block_incremental) { > s->parameters.block_incremental = params->block_incremental; > } > - if (params->has_multifd_channels) { > - s->parameters.multifd_channels = params->multifd_channels; > - } > if (params->has_multifd_compression) { > s->parameters.multifd_compression = params->multifd_compression; > } > @@ -2686,15 +2680,6 @@ bool migrate_pause_before_switchover(void) > MIGRATION_CAPABILITY_PAUSE_BEFORE_SWITCHOVER]; > } > > -int migrate_multifd_channels(void) > -{ > - MigrationState *s; > - > - s = migrate_get_current(); > - > - return s->parameters.multifd_channels; > -} > - > MultiFDCompression migrate_multifd_compression(void) > { > MigrationState *s; > diff --git a/migration/migration.h b/migration/migration.h > index fa8717ec9e..9464de8ef7 100644 > --- a/migration/migration.h > +++ b/migration/migration.h > @@ -372,7 +372,6 @@ bool migrate_validate_uuid(void); > bool migrate_auto_converge(void); > bool migrate_use_multifd(void); > bool migrate_pause_before_switchover(void); > -int migrate_multifd_channels(void); > MultiFDCompression migrate_multifd_compression(void); > int migrate_multifd_zlib_level(void); > int migrate_multifd_zstd_level(void); > diff --git a/migration/multifd.c b/migration/multifd.c > index 9282ab6aa4..ce017436fb 100644 > --- a/migration/multifd.c > +++ b/migration/multifd.c > @@ -225,7 +225,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) > return -1; > } > > - if (msg.id > migrate_multifd_channels()) { > + if (msg.id > total_multifd_channels()) { > error_setg(errp, "multifd: received channel version %u " > "expected %u", msg.version, MULTIFD_VERSION); > return -1; > @@ -410,8 +410,8 @@ static int multifd_send_pages(QEMUFile *f) > * using more channels, so ensure it doesn't overflow if the > * limit is lower now. > */ > - next_channel %= migrate_multifd_channels(); > - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { > + next_channel %= total_multifd_channels(); > + for (i = next_channel;; i = (i + 1) % total_multifd_channels()) { > p = &multifd_send_state->params[i]; > > qemu_mutex_lock(&p->mutex); > @@ -422,7 +422,7 @@ static int multifd_send_pages(QEMUFile *f) > } > if (!p->pending_job) { > p->pending_job++; > - next_channel = (i + 1) % migrate_multifd_channels(); > + next_channel = (i + 1) % total_multifd_channels(); > break; > } > qemu_mutex_unlock(&p->mutex); > @@ -500,7 +500,7 @@ static void multifd_send_terminate_threads(Error *err) > return; > } > > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > qemu_mutex_lock(&p->mutex); > @@ -521,14 +521,14 @@ void multifd_save_cleanup(void) > return; > } > multifd_send_terminate_threads(NULL); > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > if (p->running) { > qemu_thread_join(&p->thread); > } > } > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > Error *local_err = NULL; > > @@ -594,7 +594,7 @@ int multifd_send_sync_main(QEMUFile *f) > > flush_zero_copy = migrate_use_zero_copy_send(); > > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > trace_multifd_send_sync_main_signal(p->id); > @@ -627,7 +627,7 @@ int multifd_send_sync_main(QEMUFile *f) > } > } > } > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDSendParams *p = &multifd_send_state->params[i]; > > trace_multifd_send_sync_main_wait(p->id); > @@ -903,7 +903,7 @@ int multifd_save_setup(Error **errp) > int thread_count; > uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); > uint8_t i; > - > + int idx; > if (!migrate_use_multifd()) { > return 0; > } > @@ -912,7 +912,7 @@ int multifd_save_setup(Error **errp) > return -1; > } > > - thread_count = migrate_multifd_channels(); > + thread_count = total_multifd_channels(); > multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); > multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); > multifd_send_state->pages = multifd_pages_init(page_count); > @@ -945,8 +945,8 @@ int multifd_save_setup(Error **errp) > } else { > p->write_flags = 0; > } > - > - socket_send_channel_create(multifd_new_send_channel_async, p); > + idx = multifd_index(i); > + socket_send_channel_create(multifd_new_send_channel_async, p, idx); > } > > for (i = 0; i < thread_count; i++) { > @@ -991,7 +991,7 @@ static void multifd_recv_terminate_threads(Error *err) > } > } > > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > qemu_mutex_lock(&p->mutex); > @@ -1017,7 +1017,7 @@ int multifd_load_cleanup(Error **errp) > return 0; > } > multifd_recv_terminate_threads(NULL); > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > if (p->running) { > @@ -1030,7 +1030,7 @@ int multifd_load_cleanup(Error **errp) > qemu_thread_join(&p->thread); > } > } > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > migration_ioc_unregister_yank(p->c); > @@ -1065,13 +1065,13 @@ void multifd_recv_sync_main(void) > if (!migrate_use_multifd()) { > return; > } > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > trace_multifd_recv_sync_main_wait(p->id); > qemu_sem_wait(&multifd_recv_state->sem_sync); > } > - for (i = 0; i < migrate_multifd_channels(); i++) { > + for (i = 0; i < total_multifd_channels(); i++) { > MultiFDRecvParams *p = &multifd_recv_state->params[i]; > > WITH_QEMU_LOCK_GUARD(&p->mutex) { > @@ -1166,7 +1166,7 @@ int multifd_load_setup(Error **errp) > error_setg(errp, "multifd is not supported by current protocol"); > return -1; > } > - thread_count = migrate_multifd_channels(); > + thread_count = total_multifd_channels(); > multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); > multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); > qatomic_set(&multifd_recv_state->count, 0); > @@ -1204,7 +1204,7 @@ int multifd_load_setup(Error **errp) > > bool multifd_recv_all_channels_created(void) > { > - int thread_count = migrate_multifd_channels(); > + int thread_count = total_multifd_channels(); > > if (!migrate_use_multifd()) { > return true; > @@ -1259,5 +1259,5 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) > QEMU_THREAD_JOINABLE); > qatomic_inc(&multifd_recv_state->count); > return qatomic_read(&multifd_recv_state->count) == > - migrate_multifd_channels(); > + total_multifd_channels(); > } > diff --git a/migration/socket.c b/migration/socket.c > index d0cb7cc6a6..c0ac6dbbe2 100644 > --- a/migration/socket.c > +++ b/migration/socket.c > @@ -28,9 +28,6 @@ > #include "trace.h" > > > -struct SocketOutgoingArgs { > - SocketAddress *saddr; > -} outgoing_args; > > struct SocketArgs { > struct SrcDestAddr data; > @@ -43,20 +40,47 @@ struct OutgoingMigrateParams { > uint64_t total_multifd_channel; > } outgoing_migrate_params; > > -void socket_send_channel_create(QIOTaskFunc f, void *data) > + > +int total_multifd_channels(void) > +{ > + return outgoing_migrate_params.total_multifd_channel; > +} > + > +int multifd_index(int i) > +{ > + int length = outgoing_migrate_params.length; > + int j = 0; > + int runn_sum = 0; > + while (j < length) { > + runn_sum += outgoing_migrate_params.socket_args[j].multifd_channels; > + if (i >= runn_sum) { > + j++; > + } else { > + break; > + } > + } > + return j; > +} > + > +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx) > { > QIOChannelSocket *sioc = qio_channel_socket_new(); > - qio_channel_socket_connect_async(sioc, outgoing_args.saddr, > - f, data, NULL, NULL, NULL); > + qio_channel_socket_connect_async(sioc, > + outgoing_migrate_params.socket_args[idx].data.dst_addr, > + f, data, NULL, NULL, > + outgoing_migrate_params.socket_args[idx].data.src_addr); > } > > int socket_send_channel_destroy(QIOChannel *send) > { > /* Remove channel */ > object_unref(OBJECT(send)); > - if (outgoing_args.saddr) { > - qapi_free_SocketAddress(outgoing_args.saddr); > - outgoing_args.saddr = NULL; > + if (outgoing_migrate_params.socket_args != NULL) { > + g_free(outgoing_migrate_params.socket_args); > + outgoing_migrate_params.socket_args = NULL; > + } > + if (outgoing_migrate_params.length) { > + outgoing_migrate_params.length = 0; > } > > if (outgoing_migrate_params.socket_args != NULL) { > diff --git a/migration/socket.h b/migration/socket.h > index b9e3699167..c8b9252384 100644 > --- a/migration/socket.h > +++ b/migration/socket.h > @@ -27,7 +27,9 @@ struct SrcDestAddr { > SocketAddress *src_addr; > }; > > -void socket_send_channel_create(QIOTaskFunc f, void *data); > +int total_multifd_channels(void); > +int multifd_index(int i); > +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx); > int socket_send_channel_destroy(QIOChannel *send); > > void socket_start_incoming_migration(const char *str, uint8_t number, > diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c > index 32a6b67d5f..9a3d76d6ba 100644 > --- a/monitor/hmp-cmds.c > +++ b/monitor/hmp-cmds.c > @@ -1281,10 +1281,6 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict) > p->has_block_incremental = true; > visit_type_bool(v, param, &p->block_incremental, &err); > break; > - case MIGRATION_PARAMETER_MULTIFD_CHANNELS: > - p->has_multifd_channels = true; > - visit_type_uint8(v, param, &p->multifd_channels, &err); > - break; > case MIGRATION_PARAMETER_MULTIFD_COMPRESSION: > p->has_multifd_compression = true; > visit_type_MultiFDCompression(v, param, &p->multifd_compression, > diff --git a/qapi/migration.json b/qapi/migration.json > index 62a7b22d19..1b1c6d01d3 100644 > --- a/qapi/migration.json > +++ b/qapi/migration.json > @@ -877,11 +877,6 @@ > # migrated and the destination must already have access to the > # same backing chain as was used on the source. (since 2.10) > # > -# @multifd-channels: Number of channels used to migrate data in > -# parallel. This is the same number that the > -# number of sockets used for migration. The > -# default value is 2 (since 4.0) > -# > # @xbzrle-cache-size: cache size to be used by XBZRLE migration. It > # needs to be a multiple of the target page size > # and a power of 2 > @@ -965,7 +960,6 @@ > '*x-checkpoint-delay': { 'type': 'uint32', > 'features': [ 'unstable' ] }, > '*block-incremental': 'bool', > - '*multifd-channels': 'uint8', > '*xbzrle-cache-size': 'size', > '*max-postcopy-bandwidth': 'size', > '*max-cpu-throttle': 'uint8', > -- > 2.22.3 >
On 17/06/22 12:17 am, Dr. David Alan Gilbert wrote: > * Het Gala (het.gala@nutanix.com) wrote: >> i) Dynamically decide appropriate source and destination ip pairs for the >> corresponding multi-FD channel to be connected. >> >> ii) Removed the support for setting the number of multi-fd channels from qmp >> commands. As now all multiFD parameters will be passed via qmp: migrate >> command or incoming flag itself. > We can't do that, because it's part of the API already; what you'll need > to do is check that the number of entries in your list corresponds to > the value set there and error if it's different. > > Dave > thanks for review David. Yes, we will make sure in V2 that nothing existing breaks. - Manish Mishra >> Suggested-by: Manish Mishra <manish.mishra@nutanix.com> >> Signed-off-by: Het Gala <het.gala@nutanix.com> >> --- >> migration/migration.c | 15 --------------- >> migration/migration.h | 1 - >> migration/multifd.c | 42 +++++++++++++++++++++--------------------- >> migration/socket.c | 42 +++++++++++++++++++++++++++++++++--------- >> migration/socket.h | 4 +++- >> monitor/hmp-cmds.c | 4 ---- >> qapi/migration.json | 6 ------ >> 7 files changed, 57 insertions(+), 57 deletions(-) >> >> diff --git a/migration/migration.c b/migration/migration.c >> index 9b0ad732e7..57dd4494b4 100644 >> --- a/migration/migration.c >> +++ b/migration/migration.c >> @@ -1585,9 +1585,6 @@ static void migrate_params_test_apply(MigrateSetParameters *params, >> if (params->has_block_incremental) { >> dest->block_incremental = params->block_incremental; >> } >> - if (params->has_multifd_channels) { >> - dest->multifd_channels = params->multifd_channels; >> - } >> if (params->has_multifd_compression) { >> dest->multifd_compression = params->multifd_compression; >> } >> @@ -1702,9 +1699,6 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp) >> if (params->has_block_incremental) { >> s->parameters.block_incremental = params->block_incremental; >> } >> - if (params->has_multifd_channels) { >> - s->parameters.multifd_channels = params->multifd_channels; >> - } >> if (params->has_multifd_compression) { >> s->parameters.multifd_compression = params->multifd_compression; >> } >> @@ -2686,15 +2680,6 @@ bool migrate_pause_before_switchover(void) >> MIGRATION_CAPABILITY_PAUSE_BEFORE_SWITCHOVER]; >> } >> >> -int migrate_multifd_channels(void) >> -{ >> - MigrationState *s; >> - >> - s = migrate_get_current(); >> - >> - return s->parameters.multifd_channels; >> -} >> - >> MultiFDCompression migrate_multifd_compression(void) >> { >> MigrationState *s; >> diff --git a/migration/migration.h b/migration/migration.h >> index fa8717ec9e..9464de8ef7 100644 >> --- a/migration/migration.h >> +++ b/migration/migration.h >> @@ -372,7 +372,6 @@ bool migrate_validate_uuid(void); >> bool migrate_auto_converge(void); >> bool migrate_use_multifd(void); >> bool migrate_pause_before_switchover(void); >> -int migrate_multifd_channels(void); >> MultiFDCompression migrate_multifd_compression(void); >> int migrate_multifd_zlib_level(void); >> int migrate_multifd_zstd_level(void); >> diff --git a/migration/multifd.c b/migration/multifd.c >> index 9282ab6aa4..ce017436fb 100644 >> --- a/migration/multifd.c >> +++ b/migration/multifd.c >> @@ -225,7 +225,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) >> return -1; >> } >> >> - if (msg.id > migrate_multifd_channels()) { >> + if (msg.id > total_multifd_channels()) { >> error_setg(errp, "multifd: received channel version %u " >> "expected %u", msg.version, MULTIFD_VERSION); >> return -1; >> @@ -410,8 +410,8 @@ static int multifd_send_pages(QEMUFile *f) >> * using more channels, so ensure it doesn't overflow if the >> * limit is lower now. >> */ >> - next_channel %= migrate_multifd_channels(); >> - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { >> + next_channel %= total_multifd_channels(); >> + for (i = next_channel;; i = (i + 1) % total_multifd_channels()) { >> p = &multifd_send_state->params[i]; >> >> qemu_mutex_lock(&p->mutex); >> @@ -422,7 +422,7 @@ static int multifd_send_pages(QEMUFile *f) >> } >> if (!p->pending_job) { >> p->pending_job++; >> - next_channel = (i + 1) % migrate_multifd_channels(); >> + next_channel = (i + 1) % total_multifd_channels(); >> break; >> } >> qemu_mutex_unlock(&p->mutex); >> @@ -500,7 +500,7 @@ static void multifd_send_terminate_threads(Error *err) >> return; >> } >> >> - for (i = 0; i < migrate_multifd_channels(); i++) { >> + for (i = 0; i < total_multifd_channels(); i++) { >> MultiFDSendParams *p = &multifd_send_state->params[i]; >> >> qemu_mutex_lock(&p->mutex); >> @@ -521,14 +521,14 @@ void multifd_save_cleanup(void) >> return; >> } >> multifd_send_terminate_threads(NULL); >> - for (i = 0; i < migrate_multifd_channels(); i++) { >> + for (i = 0; i < total_multifd_channels(); i++) { >> MultiFDSendParams *p = &multifd_send_state->params[i]; >> >> if (p->running) { >> qemu_thread_join(&p->thread); >> } >> } >> - for (i = 0; i < migrate_multifd_channels(); i++) { >> + for (i = 0; i < total_multifd_channels(); i++) { >> MultiFDSendParams *p = &multifd_send_state->params[i]; >> Error *local_err = NULL; >> >> @@ -594,7 +594,7 @@ int multifd_send_sync_main(QEMUFile *f) >> >> flush_zero_copy = migrate_use_zero_copy_send(); >> >> - for (i = 0; i < migrate_multifd_channels(); i++) { >> + for (i = 0; i < total_multifd_channels(); i++) { >> MultiFDSendParams *p = &multifd_send_state->params[i]; >> >> trace_multifd_send_sync_main_signal(p->id); >> @@ -627,7 +627,7 @@ int multifd_send_sync_main(QEMUFile *f) >> } >> } >> } >> - for (i = 0; i < migrate_multifd_channels(); i++) { >> + for (i = 0; i < total_multifd_channels(); i++) { >> MultiFDSendParams *p = &multifd_send_state->params[i]; >> >> trace_multifd_send_sync_main_wait(p->id); >> @@ -903,7 +903,7 @@ int multifd_save_setup(Error **errp) >> int thread_count; >> uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); >> uint8_t i; >> - >> + int idx; >> if (!migrate_use_multifd()) { >> return 0; >> } >> @@ -912,7 +912,7 @@ int multifd_save_setup(Error **errp) >> return -1; >> } >> >> - thread_count = migrate_multifd_channels(); >> + thread_count = total_multifd_channels(); >> multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); >> multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); >> multifd_send_state->pages = multifd_pages_init(page_count); >> @@ -945,8 +945,8 @@ int multifd_save_setup(Error **errp) >> } else { >> p->write_flags = 0; >> } >> - >> - socket_send_channel_create(multifd_new_send_channel_async, p); >> + idx = multifd_index(i); >> + socket_send_channel_create(multifd_new_send_channel_async, p, idx); >> } >> >> for (i = 0; i < thread_count; i++) { >> @@ -991,7 +991,7 @@ static void multifd_recv_terminate_threads(Error *err) >> } >> } >> >> - for (i = 0; i < migrate_multifd_channels(); i++) { >> + for (i = 0; i < total_multifd_channels(); i++) { >> MultiFDRecvParams *p = &multifd_recv_state->params[i]; >> >> qemu_mutex_lock(&p->mutex); >> @@ -1017,7 +1017,7 @@ int multifd_load_cleanup(Error **errp) >> return 0; >> } >> multifd_recv_terminate_threads(NULL); >> - for (i = 0; i < migrate_multifd_channels(); i++) { >> + for (i = 0; i < total_multifd_channels(); i++) { >> MultiFDRecvParams *p = &multifd_recv_state->params[i]; >> >> if (p->running) { >> @@ -1030,7 +1030,7 @@ int multifd_load_cleanup(Error **errp) >> qemu_thread_join(&p->thread); >> } >> } >> - for (i = 0; i < migrate_multifd_channels(); i++) { >> + for (i = 0; i < total_multifd_channels(); i++) { >> MultiFDRecvParams *p = &multifd_recv_state->params[i]; >> >> migration_ioc_unregister_yank(p->c); >> @@ -1065,13 +1065,13 @@ void multifd_recv_sync_main(void) >> if (!migrate_use_multifd()) { >> return; >> } >> - for (i = 0; i < migrate_multifd_channels(); i++) { >> + for (i = 0; i < total_multifd_channels(); i++) { >> MultiFDRecvParams *p = &multifd_recv_state->params[i]; >> >> trace_multifd_recv_sync_main_wait(p->id); >> qemu_sem_wait(&multifd_recv_state->sem_sync); >> } >> - for (i = 0; i < migrate_multifd_channels(); i++) { >> + for (i = 0; i < total_multifd_channels(); i++) { >> MultiFDRecvParams *p = &multifd_recv_state->params[i]; >> >> WITH_QEMU_LOCK_GUARD(&p->mutex) { >> @@ -1166,7 +1166,7 @@ int multifd_load_setup(Error **errp) >> error_setg(errp, "multifd is not supported by current protocol"); >> return -1; >> } >> - thread_count = migrate_multifd_channels(); >> + thread_count = total_multifd_channels(); >> multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); >> multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); >> qatomic_set(&multifd_recv_state->count, 0); >> @@ -1204,7 +1204,7 @@ int multifd_load_setup(Error **errp) >> >> bool multifd_recv_all_channels_created(void) >> { >> - int thread_count = migrate_multifd_channels(); >> + int thread_count = total_multifd_channels(); >> >> if (!migrate_use_multifd()) { >> return true; >> @@ -1259,5 +1259,5 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) >> QEMU_THREAD_JOINABLE); >> qatomic_inc(&multifd_recv_state->count); >> return qatomic_read(&multifd_recv_state->count) == >> - migrate_multifd_channels(); >> + total_multifd_channels(); >> } >> diff --git a/migration/socket.c b/migration/socket.c >> index d0cb7cc6a6..c0ac6dbbe2 100644 >> --- a/migration/socket.c >> +++ b/migration/socket.c >> @@ -28,9 +28,6 @@ >> #include "trace.h" >> >> >> -struct SocketOutgoingArgs { >> - SocketAddress *saddr; >> -} outgoing_args; >> >> struct SocketArgs { >> struct SrcDestAddr data; >> @@ -43,20 +40,47 @@ struct OutgoingMigrateParams { >> uint64_t total_multifd_channel; >> } outgoing_migrate_params; >> >> -void socket_send_channel_create(QIOTaskFunc f, void *data) >> + >> +int total_multifd_channels(void) >> +{ >> + return outgoing_migrate_params.total_multifd_channel; >> +} >> + >> +int multifd_index(int i) >> +{ >> + int length = outgoing_migrate_params.length; >> + int j = 0; >> + int runn_sum = 0; >> + while (j < length) { >> + runn_sum += outgoing_migrate_params.socket_args[j].multifd_channels; >> + if (i >= runn_sum) { >> + j++; >> + } else { >> + break; >> + } >> + } >> + return j; >> +} >> + >> +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx) >> { >> QIOChannelSocket *sioc = qio_channel_socket_new(); >> - qio_channel_socket_connect_async(sioc, outgoing_args.saddr, >> - f, data, NULL, NULL, NULL); >> + qio_channel_socket_connect_async(sioc, >> + outgoing_migrate_params.socket_args[idx].data.dst_addr, >> + f, data, NULL, NULL, >> + outgoing_migrate_params.socket_args[idx].data.src_addr); >> } >> >> int socket_send_channel_destroy(QIOChannel *send) >> { >> /* Remove channel */ >> object_unref(OBJECT(send)); >> - if (outgoing_args.saddr) { >> - qapi_free_SocketAddress(outgoing_args.saddr); >> - outgoing_args.saddr = NULL; >> + if (outgoing_migrate_params.socket_args != NULL) { >> + g_free(outgoing_migrate_params.socket_args); >> + outgoing_migrate_params.socket_args = NULL; >> + } >> + if (outgoing_migrate_params.length) { >> + outgoing_migrate_params.length = 0; >> } >> >> if (outgoing_migrate_params.socket_args != NULL) { >> diff --git a/migration/socket.h b/migration/socket.h >> index b9e3699167..c8b9252384 100644 >> --- a/migration/socket.h >> +++ b/migration/socket.h >> @@ -27,7 +27,9 @@ struct SrcDestAddr { >> SocketAddress *src_addr; >> }; >> >> -void socket_send_channel_create(QIOTaskFunc f, void *data); >> +int total_multifd_channels(void); >> +int multifd_index(int i); >> +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx); >> int socket_send_channel_destroy(QIOChannel *send); >> >> void socket_start_incoming_migration(const char *str, uint8_t number, >> diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c >> index 32a6b67d5f..9a3d76d6ba 100644 >> --- a/monitor/hmp-cmds.c >> +++ b/monitor/hmp-cmds.c >> @@ -1281,10 +1281,6 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict) >> p->has_block_incremental = true; >> visit_type_bool(v, param, &p->block_incremental, &err); >> break; >> - case MIGRATION_PARAMETER_MULTIFD_CHANNELS: >> - p->has_multifd_channels = true; >> - visit_type_uint8(v, param, &p->multifd_channels, &err); >> - break; >> case MIGRATION_PARAMETER_MULTIFD_COMPRESSION: >> p->has_multifd_compression = true; >> visit_type_MultiFDCompression(v, param, &p->multifd_compression, >> diff --git a/qapi/migration.json b/qapi/migration.json >> index 62a7b22d19..1b1c6d01d3 100644 >> --- a/qapi/migration.json >> +++ b/qapi/migration.json >> @@ -877,11 +877,6 @@ >> # migrated and the destination must already have access to the >> # same backing chain as was used on the source. (since 2.10) >> # >> -# @multifd-channels: Number of channels used to migrate data in >> -# parallel. This is the same number that the >> -# number of sockets used for migration. The >> -# default value is 2 (since 4.0) >> -# >> # @xbzrle-cache-size: cache size to be used by XBZRLE migration. It >> # needs to be a multiple of the target page size >> # and a power of 2 >> @@ -965,7 +960,6 @@ >> '*x-checkpoint-delay': { 'type': 'uint32', >> 'features': [ 'unstable' ] }, >> '*block-incremental': 'bool', >> - '*multifd-channels': 'uint8', >> '*xbzrle-cache-size': 'size', >> '*max-postcopy-bandwidth': 'size', >> '*max-cpu-throttle': 'uint8', >> -- >> 2.22.3 >>
diff --git a/migration/migration.c b/migration/migration.c index 9b0ad732e7..57dd4494b4 100644 --- a/migration/migration.c +++ b/migration/migration.c @@ -1585,9 +1585,6 @@ static void migrate_params_test_apply(MigrateSetParameters *params, if (params->has_block_incremental) { dest->block_incremental = params->block_incremental; } - if (params->has_multifd_channels) { - dest->multifd_channels = params->multifd_channels; - } if (params->has_multifd_compression) { dest->multifd_compression = params->multifd_compression; } @@ -1702,9 +1699,6 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp) if (params->has_block_incremental) { s->parameters.block_incremental = params->block_incremental; } - if (params->has_multifd_channels) { - s->parameters.multifd_channels = params->multifd_channels; - } if (params->has_multifd_compression) { s->parameters.multifd_compression = params->multifd_compression; } @@ -2686,15 +2680,6 @@ bool migrate_pause_before_switchover(void) MIGRATION_CAPABILITY_PAUSE_BEFORE_SWITCHOVER]; } -int migrate_multifd_channels(void) -{ - MigrationState *s; - - s = migrate_get_current(); - - return s->parameters.multifd_channels; -} - MultiFDCompression migrate_multifd_compression(void) { MigrationState *s; diff --git a/migration/migration.h b/migration/migration.h index fa8717ec9e..9464de8ef7 100644 --- a/migration/migration.h +++ b/migration/migration.h @@ -372,7 +372,6 @@ bool migrate_validate_uuid(void); bool migrate_auto_converge(void); bool migrate_use_multifd(void); bool migrate_pause_before_switchover(void); -int migrate_multifd_channels(void); MultiFDCompression migrate_multifd_compression(void); int migrate_multifd_zlib_level(void); int migrate_multifd_zstd_level(void); diff --git a/migration/multifd.c b/migration/multifd.c index 9282ab6aa4..ce017436fb 100644 --- a/migration/multifd.c +++ b/migration/multifd.c @@ -225,7 +225,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp) return -1; } - if (msg.id > migrate_multifd_channels()) { + if (msg.id > total_multifd_channels()) { error_setg(errp, "multifd: received channel version %u " "expected %u", msg.version, MULTIFD_VERSION); return -1; @@ -410,8 +410,8 @@ static int multifd_send_pages(QEMUFile *f) * using more channels, so ensure it doesn't overflow if the * limit is lower now. */ - next_channel %= migrate_multifd_channels(); - for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) { + next_channel %= total_multifd_channels(); + for (i = next_channel;; i = (i + 1) % total_multifd_channels()) { p = &multifd_send_state->params[i]; qemu_mutex_lock(&p->mutex); @@ -422,7 +422,7 @@ static int multifd_send_pages(QEMUFile *f) } if (!p->pending_job) { p->pending_job++; - next_channel = (i + 1) % migrate_multifd_channels(); + next_channel = (i + 1) % total_multifd_channels(); break; } qemu_mutex_unlock(&p->mutex); @@ -500,7 +500,7 @@ static void multifd_send_terminate_threads(Error *err) return; } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; qemu_mutex_lock(&p->mutex); @@ -521,14 +521,14 @@ void multifd_save_cleanup(void) return; } multifd_send_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; if (p->running) { qemu_thread_join(&p->thread); } } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; Error *local_err = NULL; @@ -594,7 +594,7 @@ int multifd_send_sync_main(QEMUFile *f) flush_zero_copy = migrate_use_zero_copy_send(); - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; trace_multifd_send_sync_main_signal(p->id); @@ -627,7 +627,7 @@ int multifd_send_sync_main(QEMUFile *f) } } } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDSendParams *p = &multifd_send_state->params[i]; trace_multifd_send_sync_main_wait(p->id); @@ -903,7 +903,7 @@ int multifd_save_setup(Error **errp) int thread_count; uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size(); uint8_t i; - + int idx; if (!migrate_use_multifd()) { return 0; } @@ -912,7 +912,7 @@ int multifd_save_setup(Error **errp) return -1; } - thread_count = migrate_multifd_channels(); + thread_count = total_multifd_channels(); multifd_send_state = g_malloc0(sizeof(*multifd_send_state)); multifd_send_state->params = g_new0(MultiFDSendParams, thread_count); multifd_send_state->pages = multifd_pages_init(page_count); @@ -945,8 +945,8 @@ int multifd_save_setup(Error **errp) } else { p->write_flags = 0; } - - socket_send_channel_create(multifd_new_send_channel_async, p); + idx = multifd_index(i); + socket_send_channel_create(multifd_new_send_channel_async, p, idx); } for (i = 0; i < thread_count; i++) { @@ -991,7 +991,7 @@ static void multifd_recv_terminate_threads(Error *err) } } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; qemu_mutex_lock(&p->mutex); @@ -1017,7 +1017,7 @@ int multifd_load_cleanup(Error **errp) return 0; } multifd_recv_terminate_threads(NULL); - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; if (p->running) { @@ -1030,7 +1030,7 @@ int multifd_load_cleanup(Error **errp) qemu_thread_join(&p->thread); } } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; migration_ioc_unregister_yank(p->c); @@ -1065,13 +1065,13 @@ void multifd_recv_sync_main(void) if (!migrate_use_multifd()) { return; } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; trace_multifd_recv_sync_main_wait(p->id); qemu_sem_wait(&multifd_recv_state->sem_sync); } - for (i = 0; i < migrate_multifd_channels(); i++) { + for (i = 0; i < total_multifd_channels(); i++) { MultiFDRecvParams *p = &multifd_recv_state->params[i]; WITH_QEMU_LOCK_GUARD(&p->mutex) { @@ -1166,7 +1166,7 @@ int multifd_load_setup(Error **errp) error_setg(errp, "multifd is not supported by current protocol"); return -1; } - thread_count = migrate_multifd_channels(); + thread_count = total_multifd_channels(); multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state)); multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count); qatomic_set(&multifd_recv_state->count, 0); @@ -1204,7 +1204,7 @@ int multifd_load_setup(Error **errp) bool multifd_recv_all_channels_created(void) { - int thread_count = migrate_multifd_channels(); + int thread_count = total_multifd_channels(); if (!migrate_use_multifd()) { return true; @@ -1259,5 +1259,5 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp) QEMU_THREAD_JOINABLE); qatomic_inc(&multifd_recv_state->count); return qatomic_read(&multifd_recv_state->count) == - migrate_multifd_channels(); + total_multifd_channels(); } diff --git a/migration/socket.c b/migration/socket.c index d0cb7cc6a6..c0ac6dbbe2 100644 --- a/migration/socket.c +++ b/migration/socket.c @@ -28,9 +28,6 @@ #include "trace.h" -struct SocketOutgoingArgs { - SocketAddress *saddr; -} outgoing_args; struct SocketArgs { struct SrcDestAddr data; @@ -43,20 +40,47 @@ struct OutgoingMigrateParams { uint64_t total_multifd_channel; } outgoing_migrate_params; -void socket_send_channel_create(QIOTaskFunc f, void *data) + +int total_multifd_channels(void) +{ + return outgoing_migrate_params.total_multifd_channel; +} + +int multifd_index(int i) +{ + int length = outgoing_migrate_params.length; + int j = 0; + int runn_sum = 0; + while (j < length) { + runn_sum += outgoing_migrate_params.socket_args[j].multifd_channels; + if (i >= runn_sum) { + j++; + } else { + break; + } + } + return j; +} + +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx) { QIOChannelSocket *sioc = qio_channel_socket_new(); - qio_channel_socket_connect_async(sioc, outgoing_args.saddr, - f, data, NULL, NULL, NULL); + qio_channel_socket_connect_async(sioc, + outgoing_migrate_params.socket_args[idx].data.dst_addr, + f, data, NULL, NULL, + outgoing_migrate_params.socket_args[idx].data.src_addr); } int socket_send_channel_destroy(QIOChannel *send) { /* Remove channel */ object_unref(OBJECT(send)); - if (outgoing_args.saddr) { - qapi_free_SocketAddress(outgoing_args.saddr); - outgoing_args.saddr = NULL; + if (outgoing_migrate_params.socket_args != NULL) { + g_free(outgoing_migrate_params.socket_args); + outgoing_migrate_params.socket_args = NULL; + } + if (outgoing_migrate_params.length) { + outgoing_migrate_params.length = 0; } if (outgoing_migrate_params.socket_args != NULL) { diff --git a/migration/socket.h b/migration/socket.h index b9e3699167..c8b9252384 100644 --- a/migration/socket.h +++ b/migration/socket.h @@ -27,7 +27,9 @@ struct SrcDestAddr { SocketAddress *src_addr; }; -void socket_send_channel_create(QIOTaskFunc f, void *data); +int total_multifd_channels(void); +int multifd_index(int i); +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx); int socket_send_channel_destroy(QIOChannel *send); void socket_start_incoming_migration(const char *str, uint8_t number, diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c index 32a6b67d5f..9a3d76d6ba 100644 --- a/monitor/hmp-cmds.c +++ b/monitor/hmp-cmds.c @@ -1281,10 +1281,6 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict) p->has_block_incremental = true; visit_type_bool(v, param, &p->block_incremental, &err); break; - case MIGRATION_PARAMETER_MULTIFD_CHANNELS: - p->has_multifd_channels = true; - visit_type_uint8(v, param, &p->multifd_channels, &err); - break; case MIGRATION_PARAMETER_MULTIFD_COMPRESSION: p->has_multifd_compression = true; visit_type_MultiFDCompression(v, param, &p->multifd_compression, diff --git a/qapi/migration.json b/qapi/migration.json index 62a7b22d19..1b1c6d01d3 100644 --- a/qapi/migration.json +++ b/qapi/migration.json @@ -877,11 +877,6 @@ # migrated and the destination must already have access to the # same backing chain as was used on the source. (since 2.10) # -# @multifd-channels: Number of channels used to migrate data in -# parallel. This is the same number that the -# number of sockets used for migration. The -# default value is 2 (since 4.0) -# # @xbzrle-cache-size: cache size to be used by XBZRLE migration. It # needs to be a multiple of the target page size # and a power of 2 @@ -965,7 +960,6 @@ '*x-checkpoint-delay': { 'type': 'uint32', 'features': [ 'unstable' ] }, '*block-incremental': 'bool', - '*multifd-channels': 'uint8', '*xbzrle-cache-size': 'size', '*max-postcopy-bandwidth': 'size', '*max-cpu-throttle': 'uint8',
i) Dynamically decide appropriate source and destination ip pairs for the corresponding multi-FD channel to be connected. ii) Removed the support for setting the number of multi-fd channels from qmp commands. As now all multiFD parameters will be passed via qmp: migrate command or incoming flag itself. Suggested-by: Manish Mishra <manish.mishra@nutanix.com> Signed-off-by: Het Gala <het.gala@nutanix.com> --- migration/migration.c | 15 --------------- migration/migration.h | 1 - migration/multifd.c | 42 +++++++++++++++++++++--------------------- migration/socket.c | 42 +++++++++++++++++++++++++++++++++--------- migration/socket.h | 4 +++- monitor/hmp-cmds.c | 4 ---- qapi/migration.json | 6 ------ 7 files changed, 57 insertions(+), 57 deletions(-)