diff mbox series

[4/4] Adding support for multi-FD connections dynamically

Message ID 20220609073305.142515-5-het.gala@nutanix.com (mailing list archive)
State New, archived
Headers show
Series Multiple interface support on top of Multi-FD | expand

Commit Message

Het Gala June 9, 2022, 7:33 a.m. UTC
i) Dynamically decide appropriate source and destination ip pairs for the
   corresponding multi-FD channel to be connected.

ii) Removed the support for setting the number of multi-fd channels from qmp
    commands. As now all multiFD parameters will be passed via qmp: migrate
    command or incoming flag itself.

Suggested-by: Manish Mishra <manish.mishra@nutanix.com>
Signed-off-by: Het Gala <het.gala@nutanix.com>
---
 migration/migration.c | 15 ---------------
 migration/migration.h |  1 -
 migration/multifd.c   | 42 +++++++++++++++++++++---------------------
 migration/socket.c    | 42 +++++++++++++++++++++++++++++++++---------
 migration/socket.h    |  4 +++-
 monitor/hmp-cmds.c    |  4 ----
 qapi/migration.json   |  6 ------
 7 files changed, 57 insertions(+), 57 deletions(-)

Comments

Dr. David Alan Gilbert June 16, 2022, 6:47 p.m. UTC | #1
* Het Gala (het.gala@nutanix.com) wrote:
> i) Dynamically decide appropriate source and destination ip pairs for the
>    corresponding multi-FD channel to be connected.
> 
> ii) Removed the support for setting the number of multi-fd channels from qmp
>     commands. As now all multiFD parameters will be passed via qmp: migrate
>     command or incoming flag itself.

We can't do that, because it's part of the API already; what you'll need
to do is check that the number of entries in your list corresponds to
the value set there and error if it's different.

Dave

> Suggested-by: Manish Mishra <manish.mishra@nutanix.com>
> Signed-off-by: Het Gala <het.gala@nutanix.com>
> ---
>  migration/migration.c | 15 ---------------
>  migration/migration.h |  1 -
>  migration/multifd.c   | 42 +++++++++++++++++++++---------------------
>  migration/socket.c    | 42 +++++++++++++++++++++++++++++++++---------
>  migration/socket.h    |  4 +++-
>  monitor/hmp-cmds.c    |  4 ----
>  qapi/migration.json   |  6 ------
>  7 files changed, 57 insertions(+), 57 deletions(-)
> 
> diff --git a/migration/migration.c b/migration/migration.c
> index 9b0ad732e7..57dd4494b4 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -1585,9 +1585,6 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
>      if (params->has_block_incremental) {
>          dest->block_incremental = params->block_incremental;
>      }
> -    if (params->has_multifd_channels) {
> -        dest->multifd_channels = params->multifd_channels;
> -    }
>      if (params->has_multifd_compression) {
>          dest->multifd_compression = params->multifd_compression;
>      }
> @@ -1702,9 +1699,6 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
>      if (params->has_block_incremental) {
>          s->parameters.block_incremental = params->block_incremental;
>      }
> -    if (params->has_multifd_channels) {
> -        s->parameters.multifd_channels = params->multifd_channels;
> -    }
>      if (params->has_multifd_compression) {
>          s->parameters.multifd_compression = params->multifd_compression;
>      }
> @@ -2686,15 +2680,6 @@ bool migrate_pause_before_switchover(void)
>          MIGRATION_CAPABILITY_PAUSE_BEFORE_SWITCHOVER];
>  }
>  
> -int migrate_multifd_channels(void)
> -{
> -    MigrationState *s;
> -
> -    s = migrate_get_current();
> -
> -    return s->parameters.multifd_channels;
> -}
> -
>  MultiFDCompression migrate_multifd_compression(void)
>  {
>      MigrationState *s;
> diff --git a/migration/migration.h b/migration/migration.h
> index fa8717ec9e..9464de8ef7 100644
> --- a/migration/migration.h
> +++ b/migration/migration.h
> @@ -372,7 +372,6 @@ bool migrate_validate_uuid(void);
>  bool migrate_auto_converge(void);
>  bool migrate_use_multifd(void);
>  bool migrate_pause_before_switchover(void);
> -int migrate_multifd_channels(void);
>  MultiFDCompression migrate_multifd_compression(void);
>  int migrate_multifd_zlib_level(void);
>  int migrate_multifd_zstd_level(void);
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 9282ab6aa4..ce017436fb 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -225,7 +225,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
>          return -1;
>      }
>  
> -    if (msg.id > migrate_multifd_channels()) {
> +    if (msg.id > total_multifd_channels()) {
>          error_setg(errp, "multifd: received channel version %u "
>                     "expected %u", msg.version, MULTIFD_VERSION);
>          return -1;
> @@ -410,8 +410,8 @@ static int multifd_send_pages(QEMUFile *f)
>       * using more channels, so ensure it doesn't overflow if the
>       * limit is lower now.
>       */
> -    next_channel %= migrate_multifd_channels();
> -    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
> +    next_channel %= total_multifd_channels();
> +    for (i = next_channel;; i = (i + 1) % total_multifd_channels()) {
>          p = &multifd_send_state->params[i];
>  
>          qemu_mutex_lock(&p->mutex);
> @@ -422,7 +422,7 @@ static int multifd_send_pages(QEMUFile *f)
>          }
>          if (!p->pending_job) {
>              p->pending_job++;
> -            next_channel = (i + 1) % migrate_multifd_channels();
> +            next_channel = (i + 1) % total_multifd_channels();
>              break;
>          }
>          qemu_mutex_unlock(&p->mutex);
> @@ -500,7 +500,7 @@ static void multifd_send_terminate_threads(Error *err)
>          return;
>      }
>  
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> +    for (i = 0; i < total_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
>          qemu_mutex_lock(&p->mutex);
> @@ -521,14 +521,14 @@ void multifd_save_cleanup(void)
>          return;
>      }
>      multifd_send_terminate_threads(NULL);
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> +    for (i = 0; i < total_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
>          if (p->running) {
>              qemu_thread_join(&p->thread);
>          }
>      }
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> +    for (i = 0; i < total_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>          Error *local_err = NULL;
>  
> @@ -594,7 +594,7 @@ int multifd_send_sync_main(QEMUFile *f)
>  
>      flush_zero_copy = migrate_use_zero_copy_send();
>  
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> +    for (i = 0; i < total_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
>          trace_multifd_send_sync_main_signal(p->id);
> @@ -627,7 +627,7 @@ int multifd_send_sync_main(QEMUFile *f)
>              }
>          }
>      }
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> +    for (i = 0; i < total_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>  
>          trace_multifd_send_sync_main_wait(p->id);
> @@ -903,7 +903,7 @@ int multifd_save_setup(Error **errp)
>      int thread_count;
>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
>      uint8_t i;
> -
> +    int idx;
>      if (!migrate_use_multifd()) {
>          return 0;
>      }
> @@ -912,7 +912,7 @@ int multifd_save_setup(Error **errp)
>          return -1;
>      }
>  
> -    thread_count = migrate_multifd_channels();
> +    thread_count = total_multifd_channels();
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>      multifd_send_state->pages = multifd_pages_init(page_count);
> @@ -945,8 +945,8 @@ int multifd_save_setup(Error **errp)
>          } else {
>              p->write_flags = 0;
>          }
> -
> -        socket_send_channel_create(multifd_new_send_channel_async, p);
> +        idx = multifd_index(i);
> +        socket_send_channel_create(multifd_new_send_channel_async, p, idx);
>      }
>  
>      for (i = 0; i < thread_count; i++) {
> @@ -991,7 +991,7 @@ static void multifd_recv_terminate_threads(Error *err)
>          }
>      }
>  
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> +    for (i = 0; i < total_multifd_channels(); i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          qemu_mutex_lock(&p->mutex);
> @@ -1017,7 +1017,7 @@ int multifd_load_cleanup(Error **errp)
>          return 0;
>      }
>      multifd_recv_terminate_threads(NULL);
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> +    for (i = 0; i < total_multifd_channels(); i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          if (p->running) {
> @@ -1030,7 +1030,7 @@ int multifd_load_cleanup(Error **errp)
>              qemu_thread_join(&p->thread);
>          }
>      }
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> +    for (i = 0; i < total_multifd_channels(); i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          migration_ioc_unregister_yank(p->c);
> @@ -1065,13 +1065,13 @@ void multifd_recv_sync_main(void)
>      if (!migrate_use_multifd()) {
>          return;
>      }
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> +    for (i = 0; i < total_multifd_channels(); i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          trace_multifd_recv_sync_main_wait(p->id);
>          qemu_sem_wait(&multifd_recv_state->sem_sync);
>      }
> -    for (i = 0; i < migrate_multifd_channels(); i++) {
> +    for (i = 0; i < total_multifd_channels(); i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
>  
>          WITH_QEMU_LOCK_GUARD(&p->mutex) {
> @@ -1166,7 +1166,7 @@ int multifd_load_setup(Error **errp)
>          error_setg(errp, "multifd is not supported by current protocol");
>          return -1;
>      }
> -    thread_count = migrate_multifd_channels();
> +    thread_count = total_multifd_channels();
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>      qatomic_set(&multifd_recv_state->count, 0);
> @@ -1204,7 +1204,7 @@ int multifd_load_setup(Error **errp)
>  
>  bool multifd_recv_all_channels_created(void)
>  {
> -    int thread_count = migrate_multifd_channels();
> +    int thread_count = total_multifd_channels();
>  
>      if (!migrate_use_multifd()) {
>          return true;
> @@ -1259,5 +1259,5 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>                         QEMU_THREAD_JOINABLE);
>      qatomic_inc(&multifd_recv_state->count);
>      return qatomic_read(&multifd_recv_state->count) ==
> -           migrate_multifd_channels();
> +           total_multifd_channels();
>  }
> diff --git a/migration/socket.c b/migration/socket.c
> index d0cb7cc6a6..c0ac6dbbe2 100644
> --- a/migration/socket.c
> +++ b/migration/socket.c
> @@ -28,9 +28,6 @@
>  #include "trace.h"
>  
>  
> -struct SocketOutgoingArgs {
> -    SocketAddress *saddr;
> -} outgoing_args;
>  
>  struct SocketArgs {
>      struct SrcDestAddr data;
> @@ -43,20 +40,47 @@ struct OutgoingMigrateParams {
>      uint64_t total_multifd_channel;
>  } outgoing_migrate_params;
>  
> -void socket_send_channel_create(QIOTaskFunc f, void *data)
> +
> +int total_multifd_channels(void)
> +{
> +    return outgoing_migrate_params.total_multifd_channel;
> +}
> +
> +int multifd_index(int i)
> +{
> +    int length = outgoing_migrate_params.length;
> +    int j = 0;
> +    int runn_sum = 0;
> +    while (j < length) {
> +        runn_sum += outgoing_migrate_params.socket_args[j].multifd_channels;
> +        if (i >= runn_sum) {
> +            j++;
> +        } else {
> +            break;
> +        }
> +    }
> +    return j;
> +}
> +
> +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx)
>  {
>      QIOChannelSocket *sioc = qio_channel_socket_new();
> -    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
> -                                     f, data, NULL, NULL, NULL);
> +    qio_channel_socket_connect_async(sioc,
> +                       outgoing_migrate_params.socket_args[idx].data.dst_addr,
> +                       f, data, NULL, NULL,
> +                       outgoing_migrate_params.socket_args[idx].data.src_addr);
>  }
>  
>  int socket_send_channel_destroy(QIOChannel *send)
>  {
>      /* Remove channel */
>      object_unref(OBJECT(send));
> -    if (outgoing_args.saddr) {
> -        qapi_free_SocketAddress(outgoing_args.saddr);
> -        outgoing_args.saddr = NULL;
> +    if (outgoing_migrate_params.socket_args != NULL) {
> +        g_free(outgoing_migrate_params.socket_args);
> +        outgoing_migrate_params.socket_args = NULL;
> +    }
> +    if (outgoing_migrate_params.length) {
> +        outgoing_migrate_params.length = 0;
>      }
>  
>      if (outgoing_migrate_params.socket_args != NULL) {
> diff --git a/migration/socket.h b/migration/socket.h
> index b9e3699167..c8b9252384 100644
> --- a/migration/socket.h
> +++ b/migration/socket.h
> @@ -27,7 +27,9 @@ struct SrcDestAddr {
>      SocketAddress *src_addr;
>  };
>  
> -void socket_send_channel_create(QIOTaskFunc f, void *data);
> +int total_multifd_channels(void);
> +int multifd_index(int i);
> +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx);
>  int socket_send_channel_destroy(QIOChannel *send);
>  
>  void socket_start_incoming_migration(const char *str, uint8_t number,
> diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c
> index 32a6b67d5f..9a3d76d6ba 100644
> --- a/monitor/hmp-cmds.c
> +++ b/monitor/hmp-cmds.c
> @@ -1281,10 +1281,6 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
>          p->has_block_incremental = true;
>          visit_type_bool(v, param, &p->block_incremental, &err);
>          break;
> -    case MIGRATION_PARAMETER_MULTIFD_CHANNELS:
> -        p->has_multifd_channels = true;
> -        visit_type_uint8(v, param, &p->multifd_channels, &err);
> -        break;
>      case MIGRATION_PARAMETER_MULTIFD_COMPRESSION:
>          p->has_multifd_compression = true;
>          visit_type_MultiFDCompression(v, param, &p->multifd_compression,
> diff --git a/qapi/migration.json b/qapi/migration.json
> index 62a7b22d19..1b1c6d01d3 100644
> --- a/qapi/migration.json
> +++ b/qapi/migration.json
> @@ -877,11 +877,6 @@
>  #                     migrated and the destination must already have access to the
>  #                     same backing chain as was used on the source.  (since 2.10)
>  #
> -# @multifd-channels: Number of channels used to migrate data in
> -#                    parallel. This is the same number that the
> -#                    number of sockets used for migration.  The
> -#                    default value is 2 (since 4.0)
> -#
>  # @xbzrle-cache-size: cache size to be used by XBZRLE migration.  It
>  #                     needs to be a multiple of the target page size
>  #                     and a power of 2
> @@ -965,7 +960,6 @@
>              '*x-checkpoint-delay': { 'type': 'uint32',
>                                       'features': [ 'unstable' ] },
>              '*block-incremental': 'bool',
> -            '*multifd-channels': 'uint8',
>              '*xbzrle-cache-size': 'size',
>              '*max-postcopy-bandwidth': 'size',
>              '*max-cpu-throttle': 'uint8',
> -- 
> 2.22.3
>
Manish June 21, 2022, 4:12 p.m. UTC | #2
On 17/06/22 12:17 am, Dr. David Alan Gilbert wrote:
> * Het Gala (het.gala@nutanix.com) wrote:
>> i) Dynamically decide appropriate source and destination ip pairs for the
>>     corresponding multi-FD channel to be connected.
>>
>> ii) Removed the support for setting the number of multi-fd channels from qmp
>>      commands. As now all multiFD parameters will be passed via qmp: migrate
>>      command or incoming flag itself.
> We can't do that, because it's part of the API already; what you'll need
> to do is check that the number of entries in your list corresponds to
> the value set there and error if it's different.
>
> Dave
>
thanks for review David. Yes, we will make sure in V2 that nothing existing breaks.

- Manish Mishra

>> Suggested-by: Manish Mishra <manish.mishra@nutanix.com>
>> Signed-off-by: Het Gala <het.gala@nutanix.com>
>> ---
>>   migration/migration.c | 15 ---------------
>>   migration/migration.h |  1 -
>>   migration/multifd.c   | 42 +++++++++++++++++++++---------------------
>>   migration/socket.c    | 42 +++++++++++++++++++++++++++++++++---------
>>   migration/socket.h    |  4 +++-
>>   monitor/hmp-cmds.c    |  4 ----
>>   qapi/migration.json   |  6 ------
>>   7 files changed, 57 insertions(+), 57 deletions(-)
>>
>> diff --git a/migration/migration.c b/migration/migration.c
>> index 9b0ad732e7..57dd4494b4 100644
>> --- a/migration/migration.c
>> +++ b/migration/migration.c
>> @@ -1585,9 +1585,6 @@ static void migrate_params_test_apply(MigrateSetParameters *params,
>>       if (params->has_block_incremental) {
>>           dest->block_incremental = params->block_incremental;
>>       }
>> -    if (params->has_multifd_channels) {
>> -        dest->multifd_channels = params->multifd_channels;
>> -    }
>>       if (params->has_multifd_compression) {
>>           dest->multifd_compression = params->multifd_compression;
>>       }
>> @@ -1702,9 +1699,6 @@ static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
>>       if (params->has_block_incremental) {
>>           s->parameters.block_incremental = params->block_incremental;
>>       }
>> -    if (params->has_multifd_channels) {
>> -        s->parameters.multifd_channels = params->multifd_channels;
>> -    }
>>       if (params->has_multifd_compression) {
>>           s->parameters.multifd_compression = params->multifd_compression;
>>       }
>> @@ -2686,15 +2680,6 @@ bool migrate_pause_before_switchover(void)
>>           MIGRATION_CAPABILITY_PAUSE_BEFORE_SWITCHOVER];
>>   }
>>   
>> -int migrate_multifd_channels(void)
>> -{
>> -    MigrationState *s;
>> -
>> -    s = migrate_get_current();
>> -
>> -    return s->parameters.multifd_channels;
>> -}
>> -
>>   MultiFDCompression migrate_multifd_compression(void)
>>   {
>>       MigrationState *s;
>> diff --git a/migration/migration.h b/migration/migration.h
>> index fa8717ec9e..9464de8ef7 100644
>> --- a/migration/migration.h
>> +++ b/migration/migration.h
>> @@ -372,7 +372,6 @@ bool migrate_validate_uuid(void);
>>   bool migrate_auto_converge(void);
>>   bool migrate_use_multifd(void);
>>   bool migrate_pause_before_switchover(void);
>> -int migrate_multifd_channels(void);
>>   MultiFDCompression migrate_multifd_compression(void);
>>   int migrate_multifd_zlib_level(void);
>>   int migrate_multifd_zstd_level(void);
>> diff --git a/migration/multifd.c b/migration/multifd.c
>> index 9282ab6aa4..ce017436fb 100644
>> --- a/migration/multifd.c
>> +++ b/migration/multifd.c
>> @@ -225,7 +225,7 @@ static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
>>           return -1;
>>       }
>>   
>> -    if (msg.id > migrate_multifd_channels()) {
>> +    if (msg.id > total_multifd_channels()) {
>>           error_setg(errp, "multifd: received channel version %u "
>>                      "expected %u", msg.version, MULTIFD_VERSION);
>>           return -1;
>> @@ -410,8 +410,8 @@ static int multifd_send_pages(QEMUFile *f)
>>        * using more channels, so ensure it doesn't overflow if the
>>        * limit is lower now.
>>        */
>> -    next_channel %= migrate_multifd_channels();
>> -    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
>> +    next_channel %= total_multifd_channels();
>> +    for (i = next_channel;; i = (i + 1) % total_multifd_channels()) {
>>           p = &multifd_send_state->params[i];
>>   
>>           qemu_mutex_lock(&p->mutex);
>> @@ -422,7 +422,7 @@ static int multifd_send_pages(QEMUFile *f)
>>           }
>>           if (!p->pending_job) {
>>               p->pending_job++;
>> -            next_channel = (i + 1) % migrate_multifd_channels();
>> +            next_channel = (i + 1) % total_multifd_channels();
>>               break;
>>           }
>>           qemu_mutex_unlock(&p->mutex);
>> @@ -500,7 +500,7 @@ static void multifd_send_terminate_threads(Error *err)
>>           return;
>>       }
>>   
>> -    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +    for (i = 0; i < total_multifd_channels(); i++) {
>>           MultiFDSendParams *p = &multifd_send_state->params[i];
>>   
>>           qemu_mutex_lock(&p->mutex);
>> @@ -521,14 +521,14 @@ void multifd_save_cleanup(void)
>>           return;
>>       }
>>       multifd_send_terminate_threads(NULL);
>> -    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +    for (i = 0; i < total_multifd_channels(); i++) {
>>           MultiFDSendParams *p = &multifd_send_state->params[i];
>>   
>>           if (p->running) {
>>               qemu_thread_join(&p->thread);
>>           }
>>       }
>> -    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +    for (i = 0; i < total_multifd_channels(); i++) {
>>           MultiFDSendParams *p = &multifd_send_state->params[i];
>>           Error *local_err = NULL;
>>   
>> @@ -594,7 +594,7 @@ int multifd_send_sync_main(QEMUFile *f)
>>   
>>       flush_zero_copy = migrate_use_zero_copy_send();
>>   
>> -    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +    for (i = 0; i < total_multifd_channels(); i++) {
>>           MultiFDSendParams *p = &multifd_send_state->params[i];
>>   
>>           trace_multifd_send_sync_main_signal(p->id);
>> @@ -627,7 +627,7 @@ int multifd_send_sync_main(QEMUFile *f)
>>               }
>>           }
>>       }
>> -    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +    for (i = 0; i < total_multifd_channels(); i++) {
>>           MultiFDSendParams *p = &multifd_send_state->params[i];
>>   
>>           trace_multifd_send_sync_main_wait(p->id);
>> @@ -903,7 +903,7 @@ int multifd_save_setup(Error **errp)
>>       int thread_count;
>>       uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
>>       uint8_t i;
>> -
>> +    int idx;
>>       if (!migrate_use_multifd()) {
>>           return 0;
>>       }
>> @@ -912,7 +912,7 @@ int multifd_save_setup(Error **errp)
>>           return -1;
>>       }
>>   
>> -    thread_count = migrate_multifd_channels();
>> +    thread_count = total_multifd_channels();
>>       multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>>       multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>>       multifd_send_state->pages = multifd_pages_init(page_count);
>> @@ -945,8 +945,8 @@ int multifd_save_setup(Error **errp)
>>           } else {
>>               p->write_flags = 0;
>>           }
>> -
>> -        socket_send_channel_create(multifd_new_send_channel_async, p);
>> +        idx = multifd_index(i);
>> +        socket_send_channel_create(multifd_new_send_channel_async, p, idx);
>>       }
>>   
>>       for (i = 0; i < thread_count; i++) {
>> @@ -991,7 +991,7 @@ static void multifd_recv_terminate_threads(Error *err)
>>           }
>>       }
>>   
>> -    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +    for (i = 0; i < total_multifd_channels(); i++) {
>>           MultiFDRecvParams *p = &multifd_recv_state->params[i];
>>   
>>           qemu_mutex_lock(&p->mutex);
>> @@ -1017,7 +1017,7 @@ int multifd_load_cleanup(Error **errp)
>>           return 0;
>>       }
>>       multifd_recv_terminate_threads(NULL);
>> -    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +    for (i = 0; i < total_multifd_channels(); i++) {
>>           MultiFDRecvParams *p = &multifd_recv_state->params[i];
>>   
>>           if (p->running) {
>> @@ -1030,7 +1030,7 @@ int multifd_load_cleanup(Error **errp)
>>               qemu_thread_join(&p->thread);
>>           }
>>       }
>> -    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +    for (i = 0; i < total_multifd_channels(); i++) {
>>           MultiFDRecvParams *p = &multifd_recv_state->params[i];
>>   
>>           migration_ioc_unregister_yank(p->c);
>> @@ -1065,13 +1065,13 @@ void multifd_recv_sync_main(void)
>>       if (!migrate_use_multifd()) {
>>           return;
>>       }
>> -    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +    for (i = 0; i < total_multifd_channels(); i++) {
>>           MultiFDRecvParams *p = &multifd_recv_state->params[i];
>>   
>>           trace_multifd_recv_sync_main_wait(p->id);
>>           qemu_sem_wait(&multifd_recv_state->sem_sync);
>>       }
>> -    for (i = 0; i < migrate_multifd_channels(); i++) {
>> +    for (i = 0; i < total_multifd_channels(); i++) {
>>           MultiFDRecvParams *p = &multifd_recv_state->params[i];
>>   
>>           WITH_QEMU_LOCK_GUARD(&p->mutex) {
>> @@ -1166,7 +1166,7 @@ int multifd_load_setup(Error **errp)
>>           error_setg(errp, "multifd is not supported by current protocol");
>>           return -1;
>>       }
>> -    thread_count = migrate_multifd_channels();
>> +    thread_count = total_multifd_channels();
>>       multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>>       multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
>>       qatomic_set(&multifd_recv_state->count, 0);
>> @@ -1204,7 +1204,7 @@ int multifd_load_setup(Error **errp)
>>   
>>   bool multifd_recv_all_channels_created(void)
>>   {
>> -    int thread_count = migrate_multifd_channels();
>> +    int thread_count = total_multifd_channels();
>>   
>>       if (!migrate_use_multifd()) {
>>           return true;
>> @@ -1259,5 +1259,5 @@ bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>>                          QEMU_THREAD_JOINABLE);
>>       qatomic_inc(&multifd_recv_state->count);
>>       return qatomic_read(&multifd_recv_state->count) ==
>> -           migrate_multifd_channels();
>> +           total_multifd_channels();
>>   }
>> diff --git a/migration/socket.c b/migration/socket.c
>> index d0cb7cc6a6..c0ac6dbbe2 100644
>> --- a/migration/socket.c
>> +++ b/migration/socket.c
>> @@ -28,9 +28,6 @@
>>   #include "trace.h"
>>   
>>   
>> -struct SocketOutgoingArgs {
>> -    SocketAddress *saddr;
>> -} outgoing_args;
>>   
>>   struct SocketArgs {
>>       struct SrcDestAddr data;
>> @@ -43,20 +40,47 @@ struct OutgoingMigrateParams {
>>       uint64_t total_multifd_channel;
>>   } outgoing_migrate_params;
>>   
>> -void socket_send_channel_create(QIOTaskFunc f, void *data)
>> +
>> +int total_multifd_channels(void)
>> +{
>> +    return outgoing_migrate_params.total_multifd_channel;
>> +}
>> +
>> +int multifd_index(int i)
>> +{
>> +    int length = outgoing_migrate_params.length;
>> +    int j = 0;
>> +    int runn_sum = 0;
>> +    while (j < length) {
>> +        runn_sum += outgoing_migrate_params.socket_args[j].multifd_channels;
>> +        if (i >= runn_sum) {
>> +            j++;
>> +        } else {
>> +            break;
>> +        }
>> +    }
>> +    return j;
>> +}
>> +
>> +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx)
>>   {
>>       QIOChannelSocket *sioc = qio_channel_socket_new();
>> -    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
>> -                                     f, data, NULL, NULL, NULL);
>> +    qio_channel_socket_connect_async(sioc,
>> +                       outgoing_migrate_params.socket_args[idx].data.dst_addr,
>> +                       f, data, NULL, NULL,
>> +                       outgoing_migrate_params.socket_args[idx].data.src_addr);
>>   }
>>   
>>   int socket_send_channel_destroy(QIOChannel *send)
>>   {
>>       /* Remove channel */
>>       object_unref(OBJECT(send));
>> -    if (outgoing_args.saddr) {
>> -        qapi_free_SocketAddress(outgoing_args.saddr);
>> -        outgoing_args.saddr = NULL;
>> +    if (outgoing_migrate_params.socket_args != NULL) {
>> +        g_free(outgoing_migrate_params.socket_args);
>> +        outgoing_migrate_params.socket_args = NULL;
>> +    }
>> +    if (outgoing_migrate_params.length) {
>> +        outgoing_migrate_params.length = 0;
>>       }
>>   
>>       if (outgoing_migrate_params.socket_args != NULL) {
>> diff --git a/migration/socket.h b/migration/socket.h
>> index b9e3699167..c8b9252384 100644
>> --- a/migration/socket.h
>> +++ b/migration/socket.h
>> @@ -27,7 +27,9 @@ struct SrcDestAddr {
>>       SocketAddress *src_addr;
>>   };
>>   
>> -void socket_send_channel_create(QIOTaskFunc f, void *data);
>> +int total_multifd_channels(void);
>> +int multifd_index(int i);
>> +void socket_send_channel_create(QIOTaskFunc f, void *data, int idx);
>>   int socket_send_channel_destroy(QIOChannel *send);
>>   
>>   void socket_start_incoming_migration(const char *str, uint8_t number,
>> diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c
>> index 32a6b67d5f..9a3d76d6ba 100644
>> --- a/monitor/hmp-cmds.c
>> +++ b/monitor/hmp-cmds.c
>> @@ -1281,10 +1281,6 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
>>           p->has_block_incremental = true;
>>           visit_type_bool(v, param, &p->block_incremental, &err);
>>           break;
>> -    case MIGRATION_PARAMETER_MULTIFD_CHANNELS:
>> -        p->has_multifd_channels = true;
>> -        visit_type_uint8(v, param, &p->multifd_channels, &err);
>> -        break;
>>       case MIGRATION_PARAMETER_MULTIFD_COMPRESSION:
>>           p->has_multifd_compression = true;
>>           visit_type_MultiFDCompression(v, param, &p->multifd_compression,
>> diff --git a/qapi/migration.json b/qapi/migration.json
>> index 62a7b22d19..1b1c6d01d3 100644
>> --- a/qapi/migration.json
>> +++ b/qapi/migration.json
>> @@ -877,11 +877,6 @@
>>   #                     migrated and the destination must already have access to the
>>   #                     same backing chain as was used on the source.  (since 2.10)
>>   #
>> -# @multifd-channels: Number of channels used to migrate data in
>> -#                    parallel. This is the same number that the
>> -#                    number of sockets used for migration.  The
>> -#                    default value is 2 (since 4.0)
>> -#
>>   # @xbzrle-cache-size: cache size to be used by XBZRLE migration.  It
>>   #                     needs to be a multiple of the target page size
>>   #                     and a power of 2
>> @@ -965,7 +960,6 @@
>>               '*x-checkpoint-delay': { 'type': 'uint32',
>>                                        'features': [ 'unstable' ] },
>>               '*block-incremental': 'bool',
>> -            '*multifd-channels': 'uint8',
>>               '*xbzrle-cache-size': 'size',
>>               '*max-postcopy-bandwidth': 'size',
>>               '*max-cpu-throttle': 'uint8',
>> -- 
>> 2.22.3
>>
diff mbox series

Patch

diff --git a/migration/migration.c b/migration/migration.c
index 9b0ad732e7..57dd4494b4 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1585,9 +1585,6 @@  static void migrate_params_test_apply(MigrateSetParameters *params,
     if (params->has_block_incremental) {
         dest->block_incremental = params->block_incremental;
     }
-    if (params->has_multifd_channels) {
-        dest->multifd_channels = params->multifd_channels;
-    }
     if (params->has_multifd_compression) {
         dest->multifd_compression = params->multifd_compression;
     }
@@ -1702,9 +1699,6 @@  static void migrate_params_apply(MigrateSetParameters *params, Error **errp)
     if (params->has_block_incremental) {
         s->parameters.block_incremental = params->block_incremental;
     }
-    if (params->has_multifd_channels) {
-        s->parameters.multifd_channels = params->multifd_channels;
-    }
     if (params->has_multifd_compression) {
         s->parameters.multifd_compression = params->multifd_compression;
     }
@@ -2686,15 +2680,6 @@  bool migrate_pause_before_switchover(void)
         MIGRATION_CAPABILITY_PAUSE_BEFORE_SWITCHOVER];
 }
 
-int migrate_multifd_channels(void)
-{
-    MigrationState *s;
-
-    s = migrate_get_current();
-
-    return s->parameters.multifd_channels;
-}
-
 MultiFDCompression migrate_multifd_compression(void)
 {
     MigrationState *s;
diff --git a/migration/migration.h b/migration/migration.h
index fa8717ec9e..9464de8ef7 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -372,7 +372,6 @@  bool migrate_validate_uuid(void);
 bool migrate_auto_converge(void);
 bool migrate_use_multifd(void);
 bool migrate_pause_before_switchover(void);
-int migrate_multifd_channels(void);
 MultiFDCompression migrate_multifd_compression(void);
 int migrate_multifd_zlib_level(void);
 int migrate_multifd_zstd_level(void);
diff --git a/migration/multifd.c b/migration/multifd.c
index 9282ab6aa4..ce017436fb 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -225,7 +225,7 @@  static int multifd_recv_initial_packet(QIOChannel *c, Error **errp)
         return -1;
     }
 
-    if (msg.id > migrate_multifd_channels()) {
+    if (msg.id > total_multifd_channels()) {
         error_setg(errp, "multifd: received channel version %u "
                    "expected %u", msg.version, MULTIFD_VERSION);
         return -1;
@@ -410,8 +410,8 @@  static int multifd_send_pages(QEMUFile *f)
      * using more channels, so ensure it doesn't overflow if the
      * limit is lower now.
      */
-    next_channel %= migrate_multifd_channels();
-    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
+    next_channel %= total_multifd_channels();
+    for (i = next_channel;; i = (i + 1) % total_multifd_channels()) {
         p = &multifd_send_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
@@ -422,7 +422,7 @@  static int multifd_send_pages(QEMUFile *f)
         }
         if (!p->pending_job) {
             p->pending_job++;
-            next_channel = (i + 1) % migrate_multifd_channels();
+            next_channel = (i + 1) % total_multifd_channels();
             break;
         }
         qemu_mutex_unlock(&p->mutex);
@@ -500,7 +500,7 @@  static void multifd_send_terminate_threads(Error *err)
         return;
     }
 
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
@@ -521,14 +521,14 @@  void multifd_save_cleanup(void)
         return;
     }
     multifd_send_terminate_threads(NULL);
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         if (p->running) {
             qemu_thread_join(&p->thread);
         }
     }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
         Error *local_err = NULL;
 
@@ -594,7 +594,7 @@  int multifd_send_sync_main(QEMUFile *f)
 
     flush_zero_copy = migrate_use_zero_copy_send();
 
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         trace_multifd_send_sync_main_signal(p->id);
@@ -627,7 +627,7 @@  int multifd_send_sync_main(QEMUFile *f)
             }
         }
     }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
 
         trace_multifd_send_sync_main_wait(p->id);
@@ -903,7 +903,7 @@  int multifd_save_setup(Error **errp)
     int thread_count;
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
     uint8_t i;
-
+    int idx;
     if (!migrate_use_multifd()) {
         return 0;
     }
@@ -912,7 +912,7 @@  int multifd_save_setup(Error **errp)
         return -1;
     }
 
-    thread_count = migrate_multifd_channels();
+    thread_count = total_multifd_channels();
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     multifd_send_state->pages = multifd_pages_init(page_count);
@@ -945,8 +945,8 @@  int multifd_save_setup(Error **errp)
         } else {
             p->write_flags = 0;
         }
-
-        socket_send_channel_create(multifd_new_send_channel_async, p);
+        idx = multifd_index(i);
+        socket_send_channel_create(multifd_new_send_channel_async, p, idx);
     }
 
     for (i = 0; i < thread_count; i++) {
@@ -991,7 +991,7 @@  static void multifd_recv_terminate_threads(Error *err)
         }
     }
 
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         qemu_mutex_lock(&p->mutex);
@@ -1017,7 +1017,7 @@  int multifd_load_cleanup(Error **errp)
         return 0;
     }
     multifd_recv_terminate_threads(NULL);
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         if (p->running) {
@@ -1030,7 +1030,7 @@  int multifd_load_cleanup(Error **errp)
             qemu_thread_join(&p->thread);
         }
     }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         migration_ioc_unregister_yank(p->c);
@@ -1065,13 +1065,13 @@  void multifd_recv_sync_main(void)
     if (!migrate_use_multifd()) {
         return;
     }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         trace_multifd_recv_sync_main_wait(p->id);
         qemu_sem_wait(&multifd_recv_state->sem_sync);
     }
-    for (i = 0; i < migrate_multifd_channels(); i++) {
+    for (i = 0; i < total_multifd_channels(); i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
 
         WITH_QEMU_LOCK_GUARD(&p->mutex) {
@@ -1166,7 +1166,7 @@  int multifd_load_setup(Error **errp)
         error_setg(errp, "multifd is not supported by current protocol");
         return -1;
     }
-    thread_count = migrate_multifd_channels();
+    thread_count = total_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
     qatomic_set(&multifd_recv_state->count, 0);
@@ -1204,7 +1204,7 @@  int multifd_load_setup(Error **errp)
 
 bool multifd_recv_all_channels_created(void)
 {
-    int thread_count = migrate_multifd_channels();
+    int thread_count = total_multifd_channels();
 
     if (!migrate_use_multifd()) {
         return true;
@@ -1259,5 +1259,5 @@  bool multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
                        QEMU_THREAD_JOINABLE);
     qatomic_inc(&multifd_recv_state->count);
     return qatomic_read(&multifd_recv_state->count) ==
-           migrate_multifd_channels();
+           total_multifd_channels();
 }
diff --git a/migration/socket.c b/migration/socket.c
index d0cb7cc6a6..c0ac6dbbe2 100644
--- a/migration/socket.c
+++ b/migration/socket.c
@@ -28,9 +28,6 @@ 
 #include "trace.h"
 
 
-struct SocketOutgoingArgs {
-    SocketAddress *saddr;
-} outgoing_args;
 
 struct SocketArgs {
     struct SrcDestAddr data;
@@ -43,20 +40,47 @@  struct OutgoingMigrateParams {
     uint64_t total_multifd_channel;
 } outgoing_migrate_params;
 
-void socket_send_channel_create(QIOTaskFunc f, void *data)
+
+int total_multifd_channels(void)
+{
+    return outgoing_migrate_params.total_multifd_channel;
+}
+
+int multifd_index(int i)
+{
+    int length = outgoing_migrate_params.length;
+    int j = 0;
+    int runn_sum = 0;
+    while (j < length) {
+        runn_sum += outgoing_migrate_params.socket_args[j].multifd_channels;
+        if (i >= runn_sum) {
+            j++;
+        } else {
+            break;
+        }
+    }
+    return j;
+}
+
+void socket_send_channel_create(QIOTaskFunc f, void *data, int idx)
 {
     QIOChannelSocket *sioc = qio_channel_socket_new();
-    qio_channel_socket_connect_async(sioc, outgoing_args.saddr,
-                                     f, data, NULL, NULL, NULL);
+    qio_channel_socket_connect_async(sioc,
+                       outgoing_migrate_params.socket_args[idx].data.dst_addr,
+                       f, data, NULL, NULL,
+                       outgoing_migrate_params.socket_args[idx].data.src_addr);
 }
 
 int socket_send_channel_destroy(QIOChannel *send)
 {
     /* Remove channel */
     object_unref(OBJECT(send));
-    if (outgoing_args.saddr) {
-        qapi_free_SocketAddress(outgoing_args.saddr);
-        outgoing_args.saddr = NULL;
+    if (outgoing_migrate_params.socket_args != NULL) {
+        g_free(outgoing_migrate_params.socket_args);
+        outgoing_migrate_params.socket_args = NULL;
+    }
+    if (outgoing_migrate_params.length) {
+        outgoing_migrate_params.length = 0;
     }
 
     if (outgoing_migrate_params.socket_args != NULL) {
diff --git a/migration/socket.h b/migration/socket.h
index b9e3699167..c8b9252384 100644
--- a/migration/socket.h
+++ b/migration/socket.h
@@ -27,7 +27,9 @@  struct SrcDestAddr {
     SocketAddress *src_addr;
 };
 
-void socket_send_channel_create(QIOTaskFunc f, void *data);
+int total_multifd_channels(void);
+int multifd_index(int i);
+void socket_send_channel_create(QIOTaskFunc f, void *data, int idx);
 int socket_send_channel_destroy(QIOChannel *send);
 
 void socket_start_incoming_migration(const char *str, uint8_t number,
diff --git a/monitor/hmp-cmds.c b/monitor/hmp-cmds.c
index 32a6b67d5f..9a3d76d6ba 100644
--- a/monitor/hmp-cmds.c
+++ b/monitor/hmp-cmds.c
@@ -1281,10 +1281,6 @@  void hmp_migrate_set_parameter(Monitor *mon, const QDict *qdict)
         p->has_block_incremental = true;
         visit_type_bool(v, param, &p->block_incremental, &err);
         break;
-    case MIGRATION_PARAMETER_MULTIFD_CHANNELS:
-        p->has_multifd_channels = true;
-        visit_type_uint8(v, param, &p->multifd_channels, &err);
-        break;
     case MIGRATION_PARAMETER_MULTIFD_COMPRESSION:
         p->has_multifd_compression = true;
         visit_type_MultiFDCompression(v, param, &p->multifd_compression,
diff --git a/qapi/migration.json b/qapi/migration.json
index 62a7b22d19..1b1c6d01d3 100644
--- a/qapi/migration.json
+++ b/qapi/migration.json
@@ -877,11 +877,6 @@ 
 #                     migrated and the destination must already have access to the
 #                     same backing chain as was used on the source.  (since 2.10)
 #
-# @multifd-channels: Number of channels used to migrate data in
-#                    parallel. This is the same number that the
-#                    number of sockets used for migration.  The
-#                    default value is 2 (since 4.0)
-#
 # @xbzrle-cache-size: cache size to be used by XBZRLE migration.  It
 #                     needs to be a multiple of the target page size
 #                     and a power of 2
@@ -965,7 +960,6 @@ 
             '*x-checkpoint-delay': { 'type': 'uint32',
                                      'features': [ 'unstable' ] },
             '*block-incremental': 'bool',
-            '*multifd-channels': 'uint8',
             '*xbzrle-cache-size': 'size',
             '*max-postcopy-bandwidth': 'size',
             '*max-cpu-throttle': 'uint8',