diff mbox series

[v5,10/13] migration/multifd: Enable DSA offloading in multifd sender path.

Message ID 20240711220451.19780-1-yichen.wang@bytedance.com (mailing list archive)
State New, archived
Headers show
Series WIP: Use Intel DSA accelerator to offload zero page checking in multifd live migration. | expand

Commit Message

Yichen Wang July 11, 2024, 10:04 p.m. UTC
From: Hao Xiang <hao.xiang@linux.dev>

Multifd sender path gets an array of pages queued by the migration
thread. It performs zero page checking on every page in the array.
The pages are classfied as either a zero page or a normal page. This
change uses Intel DSA to offload the zero page checking from CPU to
the DSA accelerator. The sender thread submits a batch of pages to DSA
hardware and waits for the DSA completion thread to signal for work
completion.

Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
---
 include/qemu/dsa.h            |   4 +-
 migration/migration.c         |   2 +-
 migration/multifd-zero-page.c | 100 ++++++++++++++++++++++++++++++++--
 migration/multifd.c           |  43 ++++++++++++++-
 migration/multifd.h           |   2 +-
 util/dsa.c                    |  23 ++++----
 6 files changed, 150 insertions(+), 24 deletions(-)

Comments

Fabiano Rosas July 17, 2024, 2:41 p.m. UTC | #1
Yichen Wang <yichen.wang@bytedance.com> writes:

> From: Hao Xiang <hao.xiang@linux.dev>
>
> Multifd sender path gets an array of pages queued by the migration
> thread. It performs zero page checking on every page in the array.
> The pages are classfied as either a zero page or a normal page. This
> change uses Intel DSA to offload the zero page checking from CPU to
> the DSA accelerator. The sender thread submits a batch of pages to DSA
> hardware and waits for the DSA completion thread to signal for work
> completion.
>
> Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
> Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
> ---
>  include/qemu/dsa.h            |   4 +-

This patch should have no changes to dsa code. Put them in the patches
that introduce them.

>  migration/migration.c         |   2 +-
>  migration/multifd-zero-page.c | 100 ++++++++++++++++++++++++++++++++--
>  migration/multifd.c           |  43 ++++++++++++++-
>  migration/multifd.h           |   2 +-
>  util/dsa.c                    |  23 ++++----

Same with these.

>  6 files changed, 150 insertions(+), 24 deletions(-)
>
> diff --git a/include/qemu/dsa.h b/include/qemu/dsa.h
> index fd0305a7c7..a3b502ee41 100644
> --- a/include/qemu/dsa.h
> +++ b/include/qemu/dsa.h
> @@ -83,7 +83,7 @@ typedef struct QemuDsaBatchTask {
>   *
>   * @return int Zero if successful, otherwise non zero.
>   */
> -int qemu_dsa_init(const char *dsa_parameter, Error **errp);
> +int qemu_dsa_init(const strList *dsa_parameter, Error **errp);
>  
>  /**
>   * @brief Start logic to enable using DSA.
> @@ -146,7 +146,7 @@ static inline bool qemu_dsa_is_running(void)
>      return false;
>  }
>  
> -static inline int qemu_dsa_init(const char *dsa_parameter, Error **errp)
> +static inline int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
>  {
>      error_setg(errp, "DSA accelerator is not enabled.");
>      return -1;
> diff --git a/migration/migration.c b/migration/migration.c
> index 3dea06d577..085395b900 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -3469,7 +3469,7 @@ static void *migration_thread(void *opaque)
>      object_ref(OBJECT(s));
>      update_iteration_initial_status(s);
>  
> -    if (!multifd_send_setup()) {
> +    if (!multifd_send_setup(&local_err)) {

This is interesting, probably more correct than what we're doing
today. But you need to hoist the error handling out of
multifd_send_setup into here. And put this in a separate patch because
it is an improvement on its own.

>          goto out;
>      }
>  
> diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c

The way git generated this diff makes it hard to review. When this
happens, you can use a different algorithm such as --patience when
generating the patches. Compare git show vs. git show --patience to see
the difference.

> index e1b8370f88..ffb5611d44 100644
> --- a/migration/multifd-zero-page.c
> +++ b/migration/multifd-zero-page.c
> @@ -37,25 +37,84 @@ static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
>  }
>  
>  /**
> - * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> + * zero_page_detect_cpu: Perform zero page detection using CPU.
>   *
>   * Sorts normal pages before zero pages in p->pages->offset and updates
>   * p->pages->normal_num.

Probably best to carry this part along as well. This is the public
function that people will most likely look at first.

>   *
>   * @param p A pointer to the send params.
>   */
> -void multifd_send_zero_page_detect(MultiFDSendParams *p)
> +static void zero_page_detect_cpu(MultiFDSendParams *p)
> {
>      MultiFDPages_t *pages = p->pages;
>      RAMBlock *rb = pages->block;
>      int i = 0;
>      int j = pages->num - 1;
>  
> -    if (!multifd_zero_page_enabled()) {
> -        pages->normal_num = pages->num;
> +    /*
> +     * Sort the page offset array by moving all normal pages to
> +     * the left and all zero pages to the right of the array.
> +     */
> +    while (i <= j) {
> +        uint64_t offset = pages->offset[i];
> +
> +        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
> +            i++;
> +            continue;
> +        }
> +
> +        swap_page_offset(pages->offset, i, j);
> +        ram_release_page(rb->idstr, offset);
> +        j--;
> +    }
> +
> +    pages->normal_num = i;
> +}
> +
> +
> +#ifdef CONFIG_DSA_OPT
> +
> +static void swap_result(bool *results, int a, int b)
> +{
> +    bool temp;
> +
> +    if (a == b) {
>          return;
>      }
>  
> +    temp = results[a];
> +    results[a] = results[b];
> +    results[b] = temp;
> +}
> +
> +/**
> + * zero_page_detect_dsa: Perform zero page detection using
> + * Intel Data Streaming Accelerator (DSA).
> + *
> + * Sorts normal pages before zero pages in p->pages->offset and updates
> + * p->pages->normal_num.
> + *
> + * @param p A pointer to the send params.
> + */
> +static void zero_page_detect_dsa(MultiFDSendParams *p)
> +{
> +    MultiFDPages_t *pages = p->pages;

Actually use the pages variable all over instead of dereferencing
p->pages again.

> +    RAMBlock *rb = pages->block;
> +    bool *results = p->dsa_batch_task->results;

I think we had a suggestion from Peter to not carry the batch task in
the channel parameters, no?

> +
> +    for (int i = 0; i < p->pages->num; i++) {
> +        p->dsa_batch_task->addr[i] =
> +            (ram_addr_t)(rb->host + p->pages->offset[i]);
> +    }
> +
> +    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
> +                                  (const void **)p->dsa_batch_task->addr,
> +                                  p->pages->num,
> +                                  p->page_size);
> +
> +    int i = 0;
> +    int j = pages->num - 1;
> +
>      /*
>       * Sort the page offset array by moving all normal pages to
>       * the left and all zero pages to the right of the array.
> @@ -63,11 +122,12 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
>      while (i <= j) {
>          uint64_t offset = pages->offset[i];
>  
> -        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
> +        if (!results[i]) {
>              i++;
>              continue;
>          }
>  
> +        swap_result(results, i, j);
>          swap_page_offset(pages->offset, i, j);
>          ram_release_page(rb->idstr, offset);
>          j--;
> @@ -76,6 +136,15 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
>      pages->normal_num = i;
>  }
>  
> +#else
> +
> +static void zero_page_detect_dsa(MultiFDSendParams *p)
> +{
> +    exit(1);

g_assert_not_reached();

> +}
> +
> +#endif
> +
>  void multifd_recv_zero_page_process(MultiFDRecvParams *p)
>  {
>      for (int i = 0; i < p->zero_num; i++) {
> @@ -87,3 +156,24 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
>          }
>      }
>  }
> +
> +/**
> + * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> + *
> + * @param p A pointer to the send params.
> + */
> +void multifd_send_zero_page_detect(MultiFDSendParams *p)
> +{
> +    MultiFDPages_t *pages = p->pages;
> +
> +    if (!multifd_zero_page_enabled()) {
> +        pages->normal_num = pages->num;
> +        return;
> +    }
> +
> +    if (qemu_dsa_is_running()) {
> +        zero_page_detect_dsa(p);
> +    } else {
> +        zero_page_detect_cpu(p);
> +    }
> +}
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 6f8edd4b6a..014fee757a 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -817,6 +817,32 @@ static void multifd_send_cleanup_state(void)
>      multifd_send_state = NULL;
>  }
>  
> +static bool multifd_dsa_setup(MigrationState *s, const char *role, Error **errp)

You don't need MigrationState here. You can call the function only from
multifd_send_setup() and use migrate_zero_page_detection() to check for
DSA.

> +{
> +    /*
> +     * Only setup DSA when needed. Currently, DSA is only used for zero page
> +     * detection, which is only needed on sender side.
> +     */
> +    if (!s ||
> +        s->parameters.zero_page_detection != ZERO_PAGE_DETECTION_DSA_ACCEL) {
> +        return true;
> +    }
> +
> +    const strList *dsa_parameter = migrate_dsa_accel_path();
> +    if (qemu_dsa_init(dsa_parameter, errp)) {
> +        error_setg(errp, "multifd: %s failed to initialize DSA.", role);
> +        return false;
> +    }
> +    qemu_dsa_start();
> +
> +    return true;
> +}
> +
> +static void multifd_dsa_cleanup(void)
> +{
> +    qemu_dsa_cleanup();
> +}

Hmm, these two functions seem to fit better in multifd-zero-page.c.

> +
>  void multifd_send_shutdown(void)
>  {
>      int i;
> @@ -827,6 +853,8 @@ void multifd_send_shutdown(void)
>  
>      multifd_send_terminate_threads();
>  
> +    multifd_dsa_cleanup();
> +
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>          Error *local_err = NULL;
> @@ -1156,7 +1184,7 @@ static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
>      return true;
>  }
>  
> -bool multifd_send_setup(void)
> +bool multifd_send_setup(Error **errp)
>  {
>      MigrationState *s = migrate_get_current();
>      Error *local_err = NULL;

Remove this and use errp instead everywhere.

> @@ -1169,6 +1197,10 @@ bool multifd_send_setup(void)
>          return true;
>      }
>  
> +    if (!multifd_dsa_setup(s, "Sender", errp)) {
> +        return false;
> +    }
> +
>      thread_count = migrate_multifd_channels();
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> @@ -1395,6 +1427,7 @@ void multifd_recv_cleanup(void)
>              qemu_thread_join(&p->thread);
>          }
>      }
> +    multifd_dsa_cleanup();
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
>      }
> @@ -1570,6 +1603,7 @@ int multifd_recv_setup(Error **errp)
>      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
>      bool use_packets = multifd_use_packets();
>      uint8_t i;
> +    int ret;
>  
>      /*
>       * Return successfully if multiFD recv state is already initialised
> @@ -1579,6 +1613,10 @@ int multifd_recv_setup(Error **errp)
>          return 0;
>      }
>  
> +    if (!multifd_dsa_setup(NULL, "Receiver", errp)) {
> +        return -1;
> +    }

Is there a reason to call this here?

> +
>      thread_count = migrate_multifd_channels();
>      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
>      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> @@ -1617,13 +1655,12 @@ int multifd_recv_setup(Error **errp)
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDRecvParams *p = &multifd_recv_state->params[i];
> -        int ret;
> -

This is a separate cleanup patch.

>          ret = multifd_recv_state->ops->recv_setup(p, errp);
>          if (ret) {
>              return ret;
>          }
>      }
> +

Avoid introducing extra lines for no reason, this leads to git conflicts
sometimes.

>      return 0;
>  }
>  
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 027f57bf4e..871e3aa063 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -18,7 +18,7 @@
>  
>  typedef struct MultiFDRecvData MultiFDRecvData;
>  
> -bool multifd_send_setup(void);
> +bool multifd_send_setup(Error **errp);
>  void multifd_send_shutdown(void);
>  void multifd_send_channel_created(void);
>  int multifd_recv_setup(Error **errp);
> diff --git a/util/dsa.c b/util/dsa.c
> index 5aba1ae23a..44b1130a51 100644
> --- a/util/dsa.c
> +++ b/util/dsa.c
> @@ -116,27 +116,27 @@ dsa_device_cleanup(QemuDsaDevice *instance)
>   */
>  static int
>  dsa_device_group_init(QemuDsaDeviceGroup *group,
> -                      const char *dsa_parameter,
> +                      const strList *dsa_parameter,
>                        Error **errp)
>  {
> -    if (dsa_parameter == NULL || strlen(dsa_parameter) == 0) {
> -        return 0;
> +    if (dsa_parameter == NULL) {
> +        /* HACKING ALERT. */
> +        /* return 0; */
> +        dsa_parameter = &(strList) {
> +            .value = (char *)"/dev/dsa/wq0.0", .next = NULL
> +        };
>      }
>  
>      int ret = 0;
> -    char *local_dsa_parameter = g_strdup(dsa_parameter);
>      const char *dsa_path[MAX_DSA_DEVICES];
>      int num_dsa_devices = 0;
> -    char delim[2] = " ";
>  
> -    char *current_dsa_path = strtok(local_dsa_parameter, delim);
> -
> -    while (current_dsa_path != NULL) {
> -        dsa_path[num_dsa_devices++] = current_dsa_path;
> +    while (dsa_parameter) {
> +        dsa_path[num_dsa_devices++] = dsa_parameter->value;
>          if (num_dsa_devices == MAX_DSA_DEVICES) {
>              break;
>          }
> -        current_dsa_path = strtok(NULL, delim);
> +        dsa_parameter = dsa_parameter->next;
>      }
>  
>      group->dsa_devices =
> @@ -161,7 +161,6 @@ dsa_device_group_init(QemuDsaDeviceGroup *group,
>      }
>  
>  exit:
> -    g_free(local_dsa_parameter);
>      return ret;
>  }
>  
> @@ -718,7 +717,7 @@ dsa_globals_init(void)
>   *
>   * @return int Zero if successful, otherwise non zero.
>   */
> -int qemu_dsa_init(const char *dsa_parameter, Error **errp)
> +int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
>  {
>      dsa_globals_init();
Yichen Wang Sept. 9, 2024, 11:31 p.m. UTC | #2
On Wed, Jul 17, 2024 at 7:41 AM Fabiano Rosas <farosas@suse.de> wrote:
>
> Yichen Wang <yichen.wang@bytedance.com> writes:
>
> > From: Hao Xiang <hao.xiang@linux.dev>
> >
> > Multifd sender path gets an array of pages queued by the migration
> > thread. It performs zero page checking on every page in the array.
> > The pages are classfied as either a zero page or a normal page. This
> > change uses Intel DSA to offload the zero page checking from CPU to
> > the DSA accelerator. The sender thread submits a batch of pages to DSA
> > hardware and waits for the DSA completion thread to signal for work
> > completion.
> >
> > Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
> > Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
> > ---
> >  include/qemu/dsa.h            |   4 +-
>
> This patch should have no changes to dsa code. Put them in the patches
> that introduce them.
>
> >  migration/migration.c         |   2 +-
> >  migration/multifd-zero-page.c | 100 ++++++++++++++++++++++++++++++++--
> >  migration/multifd.c           |  43 ++++++++++++++-
> >  migration/multifd.h           |   2 +-
> >  util/dsa.c                    |  23 ++++----
>
> Same with these.
>
> >  6 files changed, 150 insertions(+), 24 deletions(-)
> >
> > diff --git a/include/qemu/dsa.h b/include/qemu/dsa.h
> > index fd0305a7c7..a3b502ee41 100644
> > --- a/include/qemu/dsa.h
> > +++ b/include/qemu/dsa.h
> > @@ -83,7 +83,7 @@ typedef struct QemuDsaBatchTask {
> >   *
> >   * @return int Zero if successful, otherwise non zero.
> >   */
> > -int qemu_dsa_init(const char *dsa_parameter, Error **errp);
> > +int qemu_dsa_init(const strList *dsa_parameter, Error **errp);
> >
> >  /**
> >   * @brief Start logic to enable using DSA.
> > @@ -146,7 +146,7 @@ static inline bool qemu_dsa_is_running(void)
> >      return false;
> >  }
> >
> > -static inline int qemu_dsa_init(const char *dsa_parameter, Error **errp)
> > +static inline int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
> >  {
> >      error_setg(errp, "DSA accelerator is not enabled.");
> >      return -1;
> > diff --git a/migration/migration.c b/migration/migration.c
> > index 3dea06d577..085395b900 100644
> > --- a/migration/migration.c
> > +++ b/migration/migration.c
> > @@ -3469,7 +3469,7 @@ static void *migration_thread(void *opaque)
> >      object_ref(OBJECT(s));
> >      update_iteration_initial_status(s);
> >
> > -    if (!multifd_send_setup()) {
> > +    if (!multifd_send_setup(&local_err)) {
>
> This is interesting, probably more correct than what we're doing
> today. But you need to hoist the error handling out of
> multifd_send_setup into here. And put this in a separate patch because
> it is an improvement on its own.
>
> >          goto out;
> >      }
> >
> > diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
>
> The way git generated this diff makes it hard to review. When this
> happens, you can use a different algorithm such as --patience when
> generating the patches. Compare git show vs. git show --patience to see
> the difference.
>

I tried with both --patience and default, it looks the same. So the
code is basically to implement a new multifd_send_zero_page_detect()
which calls zero_page_detect_cpu() or zero_page_detect_dsa() based on
the configuration.

> > index e1b8370f88..ffb5611d44 100644
> > --- a/migration/multifd-zero-page.c
> > +++ b/migration/multifd-zero-page.c
> > @@ -37,25 +37,84 @@ static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
> >  }
> >
> >  /**
> > - * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> > + * zero_page_detect_cpu: Perform zero page detection using CPU.
> >   *
> >   * Sorts normal pages before zero pages in p->pages->offset and updates
> >   * p->pages->normal_num.
>
> Probably best to carry this part along as well. This is the public
> function that people will most likely look at first.
>
> >   *
> >   * @param p A pointer to the send params.
> >   */
> > -void multifd_send_zero_page_detect(MultiFDSendParams *p)
> > +static void zero_page_detect_cpu(MultiFDSendParams *p)
> > {
> >      MultiFDPages_t *pages = p->pages;
> >      RAMBlock *rb = pages->block;
> >      int i = 0;
> >      int j = pages->num - 1;
> >
> > -    if (!multifd_zero_page_enabled()) {
> > -        pages->normal_num = pages->num;
> > +    /*
> > +     * Sort the page offset array by moving all normal pages to
> > +     * the left and all zero pages to the right of the array.
> > +     */
> > +    while (i <= j) {
> > +        uint64_t offset = pages->offset[i];
> > +
> > +        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
> > +            i++;
> > +            continue;
> > +        }
> > +
> > +        swap_page_offset(pages->offset, i, j);
> > +        ram_release_page(rb->idstr, offset);
> > +        j--;
> > +    }
> > +
> > +    pages->normal_num = i;
> > +}
> > +
> > +
> > +#ifdef CONFIG_DSA_OPT
> > +
> > +static void swap_result(bool *results, int a, int b)
> > +{
> > +    bool temp;
> > +
> > +    if (a == b) {
> >          return;
> >      }
> >
> > +    temp = results[a];
> > +    results[a] = results[b];
> > +    results[b] = temp;
> > +}
> > +
> > +/**
> > + * zero_page_detect_dsa: Perform zero page detection using
> > + * Intel Data Streaming Accelerator (DSA).
> > + *
> > + * Sorts normal pages before zero pages in p->pages->offset and updates
> > + * p->pages->normal_num.
> > + *
> > + * @param p A pointer to the send params.
> > + */
> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
> > +{
> > +    MultiFDPages_t *pages = p->pages;
>
> Actually use the pages variable all over instead of dereferencing
> p->pages again.
>
> > +    RAMBlock *rb = pages->block;
> > +    bool *results = p->dsa_batch_task->results;
>
> I think we had a suggestion from Peter to not carry the batch task in
> the channel parameters, no?
>

Yes, I saw that. I followed his idea and used a much more concise data
structure. We will still need to carry the task data structure in
MultiFDSendParams, but that is a pointer to a single data structure.
And I also moved some DSA specific function to their own .c files.
Will post the in my next version.

> > +
> > +    for (int i = 0; i < p->pages->num; i++) {
> > +        p->dsa_batch_task->addr[i] =
> > +            (ram_addr_t)(rb->host + p->pages->offset[i]);
> > +    }
> > +
> > +    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
> > +                                  (const void **)p->dsa_batch_task->addr,
> > +                                  p->pages->num,
> > +                                  p->page_size);
> > +
> > +    int i = 0;
> > +    int j = pages->num - 1;
> > +
> >      /*
> >       * Sort the page offset array by moving all normal pages to
> >       * the left and all zero pages to the right of the array.
> > @@ -63,11 +122,12 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
> >      while (i <= j) {
> >          uint64_t offset = pages->offset[i];
> >
> > -        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
> > +        if (!results[i]) {
> >              i++;
> >              continue;
> >          }
> >
> > +        swap_result(results, i, j);
> >          swap_page_offset(pages->offset, i, j);
> >          ram_release_page(rb->idstr, offset);
> >          j--;
> > @@ -76,6 +136,15 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
> >      pages->normal_num = i;
> >  }
> >
> > +#else
> > +
> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
> > +{
> > +    exit(1);
>
> g_assert_not_reached();
>
> > +}
> > +
> > +#endif
> > +
> >  void multifd_recv_zero_page_process(MultiFDRecvParams *p)
> >  {
> >      for (int i = 0; i < p->zero_num; i++) {
> > @@ -87,3 +156,24 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
> >          }
> >      }
> >  }
> > +
> > +/**
> > + * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> > + *
> > + * @param p A pointer to the send params.
> > + */
> > +void multifd_send_zero_page_detect(MultiFDSendParams *p)
> > +{
> > +    MultiFDPages_t *pages = p->pages;
> > +
> > +    if (!multifd_zero_page_enabled()) {
> > +        pages->normal_num = pages->num;
> > +        return;
> > +    }
> > +
> > +    if (qemu_dsa_is_running()) {
> > +        zero_page_detect_dsa(p);
> > +    } else {
> > +        zero_page_detect_cpu(p);
> > +    }
> > +}
> > diff --git a/migration/multifd.c b/migration/multifd.c
> > index 6f8edd4b6a..014fee757a 100644
> > --- a/migration/multifd.c
> > +++ b/migration/multifd.c
> > @@ -817,6 +817,32 @@ static void multifd_send_cleanup_state(void)
> >      multifd_send_state = NULL;
> >  }
> >
> > +static bool multifd_dsa_setup(MigrationState *s, const char *role, Error **errp)
>
> You don't need MigrationState here. You can call the function only from
> multifd_send_setup() and use migrate_zero_page_detection() to check for
> DSA.
>

Removed and refactored this part.

> > +{
> > +    /*
> > +     * Only setup DSA when needed. Currently, DSA is only used for zero page
> > +     * detection, which is only needed on sender side.
> > +     */
> > +    if (!s ||
> > +        s->parameters.zero_page_detection != ZERO_PAGE_DETECTION_DSA_ACCEL) {
> > +        return true;
> > +    }
> > +
> > +    const strList *dsa_parameter = migrate_dsa_accel_path();
> > +    if (qemu_dsa_init(dsa_parameter, errp)) {
> > +        error_setg(errp, "multifd: %s failed to initialize DSA.", role);
> > +        return false;
> > +    }
> > +    qemu_dsa_start();
> > +
> > +    return true;
> > +}
> > +
> > +static void multifd_dsa_cleanup(void)
> > +{
> > +    qemu_dsa_cleanup();
> > +}
>
> Hmm, these two functions seem to fit better in multifd-zero-page.c.
>
> > +
> >  void multifd_send_shutdown(void)
> >  {
> >      int i;
> > @@ -827,6 +853,8 @@ void multifd_send_shutdown(void)
> >
> >      multifd_send_terminate_threads();
> >
> > +    multifd_dsa_cleanup();
> > +
> >      for (i = 0; i < migrate_multifd_channels(); i++) {
> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> >          Error *local_err = NULL;
> > @@ -1156,7 +1184,7 @@ static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
> >      return true;
> >  }
> >
> > -bool multifd_send_setup(void)
> > +bool multifd_send_setup(Error **errp)
> >  {
> >      MigrationState *s = migrate_get_current();
> >      Error *local_err = NULL;
>
> Remove this and use errp instead everywhere.
>
> > @@ -1169,6 +1197,10 @@ bool multifd_send_setup(void)
> >          return true;
> >      }
> >
> > +    if (!multifd_dsa_setup(s, "Sender", errp)) {
> > +        return false;
> > +    }
> > +
> >      thread_count = migrate_multifd_channels();
> >      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
> >      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> > @@ -1395,6 +1427,7 @@ void multifd_recv_cleanup(void)
> >              qemu_thread_join(&p->thread);
> >          }
> >      }
> > +    multifd_dsa_cleanup();
> >      for (i = 0; i < migrate_multifd_channels(); i++) {
> >          multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
> >      }
> > @@ -1570,6 +1603,7 @@ int multifd_recv_setup(Error **errp)
> >      uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
> >      bool use_packets = multifd_use_packets();
> >      uint8_t i;
> > +    int ret;
> >
> >      /*
> >       * Return successfully if multiFD recv state is already initialised
> > @@ -1579,6 +1613,10 @@ int multifd_recv_setup(Error **errp)
> >          return 0;
> >      }
> >
> > +    if (!multifd_dsa_setup(NULL, "Receiver", errp)) {
> > +        return -1;
> > +    }
>
> Is there a reason to call this here?
>

Removed.

> > +
> >      thread_count = migrate_multifd_channels();
> >      multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
> >      multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
> > @@ -1617,13 +1655,12 @@ int multifd_recv_setup(Error **errp)
> >
> >      for (i = 0; i < thread_count; i++) {
> >          MultiFDRecvParams *p = &multifd_recv_state->params[i];
> > -        int ret;
> > -
>
> This is a separate cleanup patch.
>
> >          ret = multifd_recv_state->ops->recv_setup(p, errp);
> >          if (ret) {
> >              return ret;
> >          }
> >      }
> > +
>
> Avoid introducing extra lines for no reason, this leads to git conflicts
> sometimes.
>
> >      return 0;
> >  }
> >
> > diff --git a/migration/multifd.h b/migration/multifd.h
> > index 027f57bf4e..871e3aa063 100644
> > --- a/migration/multifd.h
> > +++ b/migration/multifd.h
> > @@ -18,7 +18,7 @@
> >
> >  typedef struct MultiFDRecvData MultiFDRecvData;
> >
> > -bool multifd_send_setup(void);
> > +bool multifd_send_setup(Error **errp);
> >  void multifd_send_shutdown(void);
> >  void multifd_send_channel_created(void);
> >  int multifd_recv_setup(Error **errp);
> > diff --git a/util/dsa.c b/util/dsa.c
> > index 5aba1ae23a..44b1130a51 100644
> > --- a/util/dsa.c
> > +++ b/util/dsa.c
> > @@ -116,27 +116,27 @@ dsa_device_cleanup(QemuDsaDevice *instance)
> >   */
> >  static int
> >  dsa_device_group_init(QemuDsaDeviceGroup *group,
> > -                      const char *dsa_parameter,
> > +                      const strList *dsa_parameter,
> >                        Error **errp)
> >  {
> > -    if (dsa_parameter == NULL || strlen(dsa_parameter) == 0) {
> > -        return 0;
> > +    if (dsa_parameter == NULL) {
> > +        /* HACKING ALERT. */
> > +        /* return 0; */
> > +        dsa_parameter = &(strList) {
> > +            .value = (char *)"/dev/dsa/wq0.0", .next = NULL
> > +        };
> >      }
> >
> >      int ret = 0;
> > -    char *local_dsa_parameter = g_strdup(dsa_parameter);
> >      const char *dsa_path[MAX_DSA_DEVICES];
> >      int num_dsa_devices = 0;
> > -    char delim[2] = " ";
> >
> > -    char *current_dsa_path = strtok(local_dsa_parameter, delim);
> > -
> > -    while (current_dsa_path != NULL) {
> > -        dsa_path[num_dsa_devices++] = current_dsa_path;
> > +    while (dsa_parameter) {
> > +        dsa_path[num_dsa_devices++] = dsa_parameter->value;
> >          if (num_dsa_devices == MAX_DSA_DEVICES) {
> >              break;
> >          }
> > -        current_dsa_path = strtok(NULL, delim);
> > +        dsa_parameter = dsa_parameter->next;
> >      }
> >
> >      group->dsa_devices =
> > @@ -161,7 +161,6 @@ dsa_device_group_init(QemuDsaDeviceGroup *group,
> >      }
> >
> >  exit:
> > -    g_free(local_dsa_parameter);
> >      return ret;
> >  }
> >
> > @@ -718,7 +717,7 @@ dsa_globals_init(void)
> >   *
> >   * @return int Zero if successful, otherwise non zero.
> >   */
> > -int qemu_dsa_init(const char *dsa_parameter, Error **errp)
> > +int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
> >  {
> >      dsa_globals_init();
diff mbox series

Patch

diff --git a/include/qemu/dsa.h b/include/qemu/dsa.h
index fd0305a7c7..a3b502ee41 100644
--- a/include/qemu/dsa.h
+++ b/include/qemu/dsa.h
@@ -83,7 +83,7 @@  typedef struct QemuDsaBatchTask {
  *
  * @return int Zero if successful, otherwise non zero.
  */
-int qemu_dsa_init(const char *dsa_parameter, Error **errp);
+int qemu_dsa_init(const strList *dsa_parameter, Error **errp);
 
 /**
  * @brief Start logic to enable using DSA.
@@ -146,7 +146,7 @@  static inline bool qemu_dsa_is_running(void)
     return false;
 }
 
-static inline int qemu_dsa_init(const char *dsa_parameter, Error **errp)
+static inline int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
 {
     error_setg(errp, "DSA accelerator is not enabled.");
     return -1;
diff --git a/migration/migration.c b/migration/migration.c
index 3dea06d577..085395b900 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -3469,7 +3469,7 @@  static void *migration_thread(void *opaque)
     object_ref(OBJECT(s));
     update_iteration_initial_status(s);
 
-    if (!multifd_send_setup()) {
+    if (!multifd_send_setup(&local_err)) {
         goto out;
     }
 
diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
index e1b8370f88..ffb5611d44 100644
--- a/migration/multifd-zero-page.c
+++ b/migration/multifd-zero-page.c
@@ -37,25 +37,84 @@  static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
 }
 
 /**
- * multifd_send_zero_page_detect: Perform zero page detection on all pages.
+ * zero_page_detect_cpu: Perform zero page detection using CPU.
  *
  * Sorts normal pages before zero pages in p->pages->offset and updates
  * p->pages->normal_num.
  *
  * @param p A pointer to the send params.
  */
-void multifd_send_zero_page_detect(MultiFDSendParams *p)
+static void zero_page_detect_cpu(MultiFDSendParams *p)
 {
     MultiFDPages_t *pages = p->pages;
     RAMBlock *rb = pages->block;
     int i = 0;
     int j = pages->num - 1;
 
-    if (!multifd_zero_page_enabled()) {
-        pages->normal_num = pages->num;
+    /*
+     * Sort the page offset array by moving all normal pages to
+     * the left and all zero pages to the right of the array.
+     */
+    while (i <= j) {
+        uint64_t offset = pages->offset[i];
+
+        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
+            i++;
+            continue;
+        }
+
+        swap_page_offset(pages->offset, i, j);
+        ram_release_page(rb->idstr, offset);
+        j--;
+    }
+
+    pages->normal_num = i;
+}
+
+
+#ifdef CONFIG_DSA_OPT
+
+static void swap_result(bool *results, int a, int b)
+{
+    bool temp;
+
+    if (a == b) {
         return;
     }
 
+    temp = results[a];
+    results[a] = results[b];
+    results[b] = temp;
+}
+
+/**
+ * zero_page_detect_dsa: Perform zero page detection using
+ * Intel Data Streaming Accelerator (DSA).
+ *
+ * Sorts normal pages before zero pages in p->pages->offset and updates
+ * p->pages->normal_num.
+ *
+ * @param p A pointer to the send params.
+ */
+static void zero_page_detect_dsa(MultiFDSendParams *p)
+{
+    MultiFDPages_t *pages = p->pages;
+    RAMBlock *rb = pages->block;
+    bool *results = p->dsa_batch_task->results;
+
+    for (int i = 0; i < p->pages->num; i++) {
+        p->dsa_batch_task->addr[i] =
+            (ram_addr_t)(rb->host + p->pages->offset[i]);
+    }
+
+    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
+                                  (const void **)p->dsa_batch_task->addr,
+                                  p->pages->num,
+                                  p->page_size);
+
+    int i = 0;
+    int j = pages->num - 1;
+
     /*
      * Sort the page offset array by moving all normal pages to
      * the left and all zero pages to the right of the array.
@@ -63,11 +122,12 @@  void multifd_send_zero_page_detect(MultiFDSendParams *p)
     while (i <= j) {
         uint64_t offset = pages->offset[i];
 
-        if (!buffer_is_zero(rb->host + offset, p->page_size)) {
+        if (!results[i]) {
             i++;
             continue;
         }
 
+        swap_result(results, i, j);
         swap_page_offset(pages->offset, i, j);
         ram_release_page(rb->idstr, offset);
         j--;
@@ -76,6 +136,15 @@  void multifd_send_zero_page_detect(MultiFDSendParams *p)
     pages->normal_num = i;
 }
 
+#else
+
+static void zero_page_detect_dsa(MultiFDSendParams *p)
+{
+    exit(1);
+}
+
+#endif
+
 void multifd_recv_zero_page_process(MultiFDRecvParams *p)
 {
     for (int i = 0; i < p->zero_num; i++) {
@@ -87,3 +156,24 @@  void multifd_recv_zero_page_process(MultiFDRecvParams *p)
         }
     }
 }
+
+/**
+ * multifd_send_zero_page_detect: Perform zero page detection on all pages.
+ *
+ * @param p A pointer to the send params.
+ */
+void multifd_send_zero_page_detect(MultiFDSendParams *p)
+{
+    MultiFDPages_t *pages = p->pages;
+
+    if (!multifd_zero_page_enabled()) {
+        pages->normal_num = pages->num;
+        return;
+    }
+
+    if (qemu_dsa_is_running()) {
+        zero_page_detect_dsa(p);
+    } else {
+        zero_page_detect_cpu(p);
+    }
+}
diff --git a/migration/multifd.c b/migration/multifd.c
index 6f8edd4b6a..014fee757a 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -817,6 +817,32 @@  static void multifd_send_cleanup_state(void)
     multifd_send_state = NULL;
 }
 
+static bool multifd_dsa_setup(MigrationState *s, const char *role, Error **errp)
+{
+    /*
+     * Only setup DSA when needed. Currently, DSA is only used for zero page
+     * detection, which is only needed on sender side.
+     */
+    if (!s ||
+        s->parameters.zero_page_detection != ZERO_PAGE_DETECTION_DSA_ACCEL) {
+        return true;
+    }
+
+    const strList *dsa_parameter = migrate_dsa_accel_path();
+    if (qemu_dsa_init(dsa_parameter, errp)) {
+        error_setg(errp, "multifd: %s failed to initialize DSA.", role);
+        return false;
+    }
+    qemu_dsa_start();
+
+    return true;
+}
+
+static void multifd_dsa_cleanup(void)
+{
+    qemu_dsa_cleanup();
+}
+
 void multifd_send_shutdown(void)
 {
     int i;
@@ -827,6 +853,8 @@  void multifd_send_shutdown(void)
 
     multifd_send_terminate_threads();
 
+    multifd_dsa_cleanup();
+
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
         Error *local_err = NULL;
@@ -1156,7 +1184,7 @@  static bool multifd_new_send_channel_create(gpointer opaque, Error **errp)
     return true;
 }
 
-bool multifd_send_setup(void)
+bool multifd_send_setup(Error **errp)
 {
     MigrationState *s = migrate_get_current();
     Error *local_err = NULL;
@@ -1169,6 +1197,10 @@  bool multifd_send_setup(void)
         return true;
     }
 
+    if (!multifd_dsa_setup(s, "Sender", errp)) {
+        return false;
+    }
+
     thread_count = migrate_multifd_channels();
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
@@ -1395,6 +1427,7 @@  void multifd_recv_cleanup(void)
             qemu_thread_join(&p->thread);
         }
     }
+    multifd_dsa_cleanup();
     for (i = 0; i < migrate_multifd_channels(); i++) {
         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
     }
@@ -1570,6 +1603,7 @@  int multifd_recv_setup(Error **errp)
     uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
     bool use_packets = multifd_use_packets();
     uint8_t i;
+    int ret;
 
     /*
      * Return successfully if multiFD recv state is already initialised
@@ -1579,6 +1613,10 @@  int multifd_recv_setup(Error **errp)
         return 0;
     }
 
+    if (!multifd_dsa_setup(NULL, "Receiver", errp)) {
+        return -1;
+    }
+
     thread_count = migrate_multifd_channels();
     multifd_recv_state = g_malloc0(sizeof(*multifd_recv_state));
     multifd_recv_state->params = g_new0(MultiFDRecvParams, thread_count);
@@ -1617,13 +1655,12 @@  int multifd_recv_setup(Error **errp)
 
     for (i = 0; i < thread_count; i++) {
         MultiFDRecvParams *p = &multifd_recv_state->params[i];
-        int ret;
-
         ret = multifd_recv_state->ops->recv_setup(p, errp);
         if (ret) {
             return ret;
         }
     }
+
     return 0;
 }
 
diff --git a/migration/multifd.h b/migration/multifd.h
index 027f57bf4e..871e3aa063 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -18,7 +18,7 @@ 
 
 typedef struct MultiFDRecvData MultiFDRecvData;
 
-bool multifd_send_setup(void);
+bool multifd_send_setup(Error **errp);
 void multifd_send_shutdown(void);
 void multifd_send_channel_created(void);
 int multifd_recv_setup(Error **errp);
diff --git a/util/dsa.c b/util/dsa.c
index 5aba1ae23a..44b1130a51 100644
--- a/util/dsa.c
+++ b/util/dsa.c
@@ -116,27 +116,27 @@  dsa_device_cleanup(QemuDsaDevice *instance)
  */
 static int
 dsa_device_group_init(QemuDsaDeviceGroup *group,
-                      const char *dsa_parameter,
+                      const strList *dsa_parameter,
                       Error **errp)
 {
-    if (dsa_parameter == NULL || strlen(dsa_parameter) == 0) {
-        return 0;
+    if (dsa_parameter == NULL) {
+        /* HACKING ALERT. */
+        /* return 0; */
+        dsa_parameter = &(strList) {
+            .value = (char *)"/dev/dsa/wq0.0", .next = NULL
+        };
     }
 
     int ret = 0;
-    char *local_dsa_parameter = g_strdup(dsa_parameter);
     const char *dsa_path[MAX_DSA_DEVICES];
     int num_dsa_devices = 0;
-    char delim[2] = " ";
 
-    char *current_dsa_path = strtok(local_dsa_parameter, delim);
-
-    while (current_dsa_path != NULL) {
-        dsa_path[num_dsa_devices++] = current_dsa_path;
+    while (dsa_parameter) {
+        dsa_path[num_dsa_devices++] = dsa_parameter->value;
         if (num_dsa_devices == MAX_DSA_DEVICES) {
             break;
         }
-        current_dsa_path = strtok(NULL, delim);
+        dsa_parameter = dsa_parameter->next;
     }
 
     group->dsa_devices =
@@ -161,7 +161,6 @@  dsa_device_group_init(QemuDsaDeviceGroup *group,
     }
 
 exit:
-    g_free(local_dsa_parameter);
     return ret;
 }
 
@@ -718,7 +717,7 @@  dsa_globals_init(void)
  *
  * @return int Zero if successful, otherwise non zero.
  */
-int qemu_dsa_init(const char *dsa_parameter, Error **errp)
+int qemu_dsa_init(const strList *dsa_parameter, Error **errp)
 {
     dsa_globals_init();