diff mbox series

[v6,5/7] migration/multifd: implement initialization of qpl compression

Message ID 20240505165751.2392198-6-yuan1.liu@intel.com (mailing list archive)
State New
Headers show
Series Live Migration With IAA | expand

Commit Message

Liu, Yuan1 May 5, 2024, 4:57 p.m. UTC
the qpl initialization includes memory allocation for compressed
data and the qpl job initialization.

the qpl job initialization will check if the In-Memory Analytics
Accelerator(IAA) device is available and use the IAA device first.
If the platform does not have IAA device or the IAA device is not
available, the qpl compression will fallback to the software path.

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 migration/multifd-qpl.c | 272 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 271 insertions(+), 1 deletion(-)

Comments

Fabiano Rosas May 10, 2024, 8:45 p.m. UTC | #1
Yuan Liu <yuan1.liu@intel.com> writes:

> the qpl initialization includes memory allocation for compressed
> data and the qpl job initialization.
>
> the qpl job initialization will check if the In-Memory Analytics
> Accelerator(IAA) device is available and use the IAA device first.
> If the platform does not have IAA device or the IAA device is not
> available, the qpl compression will fallback to the software path.
>
> Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>

Looks good, just some nits below.

> ---
>  migration/multifd-qpl.c | 272 +++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 271 insertions(+), 1 deletion(-)
>
> diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> index 056a68a060..89fa51091a 100644
> --- a/migration/multifd-qpl.c
> +++ b/migration/multifd-qpl.c
> @@ -9,12 +9,282 @@
>   * This work is licensed under the terms of the GNU GPL, version 2 or later.
>   * See the COPYING file in the top-level directory.
>   */
> +
>  #include "qemu/osdep.h"
>  #include "qemu/module.h"
> +#include "qapi/error.h"
> +#include "migration.h"
> +#include "multifd.h"
> +#include "qpl/qpl.h"
> +
> +typedef struct {
> +    qpl_job **job_array;
> +    /* the number of allocated jobs */
> +    uint32_t total_job_num;
> +    /* compressed data buffer */
> +    uint8_t *zbuf;
> +    /* the length of compressed data */

array of lenghts

> +    uint32_t *zbuf_hdr;

Why the _hdr suffix if the lengths are the only data stored here?

> +    /* the status of IAA device */
> +    bool iaa_avail;
> +} QplData;
> +
> +/**
> + * check_iaa_avail: check if IAA device is available
> + *
> + * If the system does not have an IAA device, the IAA device is
> + * not enabled or the IAA work queue is not configured as a shared
> + * mode, the QPL hardware path initialization will fail.
> + *
> + * Returns true if IAA device is available, otherwise false.
> + */
> +static bool check_iaa_avail(void)
> +{
> +    qpl_job *job = NULL;
> +    uint32_t job_size = 0;
> +    qpl_path_t path = qpl_path_hardware;
> +
> +    if (qpl_get_job_size(path, &job_size) != QPL_STS_OK) {
> +        return false;
> +    }
> +    job = g_malloc0(job_size);
> +    if (qpl_init_job(path, job) != QPL_STS_OK) {
> +        g_free(job);
> +        return false;
> +    }
> +    g_free(job);
> +    return true;
> +}
> +
> +/**
> + * multifd_qpl_free_jobs: cleanup jobs
> + *
> + * Free all job resources.
> + *
> + * @qpl: pointer to the QplData structure
> + */
> +static void multifd_qpl_free_jobs(QplData *qpl)
> +{
> +    assert(qpl != NULL);
> +    for (int i = 0; i < qpl->total_job_num; i++) {
> +        qpl_fini_job(qpl->job_array[i]);
> +        g_free(qpl->job_array[i]);
> +        qpl->job_array[i] = NULL;
> +    }
> +    g_free(qpl->job_array);
> +    qpl->job_array = NULL;
> +}
> +
> +/**
> + * multifd_qpl_init_jobs: initialize jobs
> + *
> + * Initialize all jobs
> + *
> + * @qpl: pointer to the QplData structure
> + * @chan_id: multifd channel number
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_init_jobs(QplData *qpl, uint8_t chan_id, Error **errp)
> +{
> +    qpl_path_t path;
> +    qpl_status status;
> +    uint32_t job_size = 0;
> +    qpl_job *job = NULL;
> +
> +    path = qpl->iaa_avail ? qpl_path_hardware : qpl_path_software;
> +    status = qpl_get_job_size(path, &job_size);
> +    if (status != QPL_STS_OK) {
> +        error_setg(errp, "multifd: %u: qpl_get_job_size failed with error %d",
> +                   chan_id, status);
> +        return -1;
> +    }
> +    qpl->job_array = g_new0(qpl_job *, qpl->total_job_num);
> +    for (int i = 0; i < qpl->total_job_num; i++) {
> +        job = g_malloc0(job_size);
> +        status = qpl_init_job(path, job);
> +        if (status != QPL_STS_OK) {
> +            error_setg(errp, "multifd: %u: qpl_init_job failed with error %d",
> +                       chan_id, status);
> +            multifd_qpl_free_jobs(qpl);
> +            return -1;
> +        }
> +        qpl->job_array[i] = job;
> +    }
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_init: initialize QplData structure
> + *
> + * Allocate and initialize a QplData structure
> + *
> + * Returns QplData pointer for success or NULL for error
> + *
> + * @job_num: pointer to the QplData structure

This needs updating^

> + * @job_size: the buffer size of the job
> + * @chan_id: multifd channel number
> + * @errp: pointer to an error
> + */
> +static QplData *multifd_qpl_init(uint32_t job_num, uint32_t job_size,
> +                                 uint8_t chan_id, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = g_new0(QplData, 1);
> +    qpl->total_job_num = job_num;
> +    qpl->iaa_avail = check_iaa_avail();
> +    if (multifd_qpl_init_jobs(qpl, chan_id, errp) != 0) {
> +        g_free(qpl);
> +        return NULL;
> +    }
> +    qpl->zbuf = g_malloc0(job_size * job_num);
> +    qpl->zbuf_hdr = g_new0(uint32_t, job_num);
> +    return qpl;
> +}
> +
> +/**
> + * multifd_qpl_deinit: cleanup QplData structure
> + *
> + * Free jobs, comprssed buffers and QplData structure

compressed

> + *
> + * @qpl: pointer to the QplData structure
> + */
> +static void multifd_qpl_deinit(QplData *qpl)
> +{
> +    if (qpl != NULL) {
> +        multifd_qpl_free_jobs(qpl);
> +        g_free(qpl->zbuf_hdr);
> +        g_free(qpl->zbuf);
> +        g_free(qpl);
> +    }
> +}
> +
> +/**
> + * multifd_qpl_send_setup: setup send side
> + *
> + * Setup each channel with QPL compression.

s/each/the/

> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> +    if (!qpl) {
> +        return -1;
> +    }
> +    p->compress_data = qpl;
> +
> +    /*
> +     * Each page will be compressed independently and sent using an IOV. The
> +     * additional two IOVs are used to store packet header and compressed data
> +     * length
> +     */
> +    p->iov = g_new0(struct iovec, p->page_count + 2);
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_send_cleanup: cleanup send side
> + *
> + * Close the channel and return memory.
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> +{
> +    multifd_qpl_deinit(p->compress_data);
> +    p->compress_data = NULL;
> +    g_free(p->iov);
> +    p->iov = NULL;
> +}
> +
> +/**
> + * multifd_qpl_send_prepare: prepare data to be able to send
> + *
> + * Create a compressed buffer with all the pages that we are going to
> + * send.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
> +{
> +    /* Implement in next patch */
> +    return -1;
> +}
> +
> +/**
> + * multifd_qpl_recv_setup: setup receive side
> + *
> + * Create the compressed channel and buffer.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> +    if (!qpl) {
> +        return -1;
> +    }
> +    p->compress_data = qpl;
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_recv_cleanup: setup receive side
> + *
> + * Close the channel and return memory.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> +{
> +    multifd_qpl_deinit(p->compress_data);
> +    p->compress_data = NULL;
> +}
> +
> +/**
> + * multifd_qpl_recv: read the data from the channel into actual pages
> + *
> + * Read the compressed buffer, and uncompress it into the actual
> + * pages.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> +{
> +    /* Implement in next patch */
> +    return -1;
> +}
> +
> +static MultiFDMethods multifd_qpl_ops = {
> +    .send_setup = multifd_qpl_send_setup,
> +    .send_cleanup = multifd_qpl_send_cleanup,
> +    .send_prepare = multifd_qpl_send_prepare,
> +    .recv_setup = multifd_qpl_recv_setup,
> +    .recv_cleanup = multifd_qpl_recv_cleanup,
> +    .recv = multifd_qpl_recv,
> +};
>  
>  static void multifd_qpl_register(void)
>  {
> -    /* noop */
> +    multifd_register_ops(MULTIFD_COMPRESSION_QPL, &multifd_qpl_ops);
>  }
>  
>  migration_init(multifd_qpl_register);
Fabiano Rosas May 10, 2024, 8:52 p.m. UTC | #2
Yuan Liu <yuan1.liu@intel.com> writes:

> the qpl initialization includes memory allocation for compressed
> data and the qpl job initialization.
>
> the qpl job initialization will check if the In-Memory Analytics
> Accelerator(IAA) device is available and use the IAA device first.
> If the platform does not have IAA device or the IAA device is not
> available, the qpl compression will fallback to the software path.
>
> Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> ---
>  migration/multifd-qpl.c | 272 +++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 271 insertions(+), 1 deletion(-)
>
> diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> index 056a68a060..89fa51091a 100644
> --- a/migration/multifd-qpl.c
> +++ b/migration/multifd-qpl.c
> @@ -9,12 +9,282 @@
>   * This work is licensed under the terms of the GNU GPL, version 2 or later.
>   * See the COPYING file in the top-level directory.
>   */
> +
>  #include "qemu/osdep.h"
>  #include "qemu/module.h"
> +#include "qapi/error.h"
> +#include "migration.h"

Where is this used? I think you only need qapi/qapi-types-migration.h

> +#include "multifd.h"
> +#include "qpl/qpl.h"
> +
> +typedef struct {
> +    qpl_job **job_array;
> +    /* the number of allocated jobs */
> +    uint32_t total_job_num;
> +    /* compressed data buffer */
> +    uint8_t *zbuf;
> +    /* the length of compressed data */
> +    uint32_t *zbuf_hdr;
> +    /* the status of IAA device */
> +    bool iaa_avail;
> +} QplData;
> +
> +/**
> + * check_iaa_avail: check if IAA device is available
> + *
> + * If the system does not have an IAA device, the IAA device is
> + * not enabled or the IAA work queue is not configured as a shared
> + * mode, the QPL hardware path initialization will fail.
> + *
> + * Returns true if IAA device is available, otherwise false.
> + */
> +static bool check_iaa_avail(void)
> +{
> +    qpl_job *job = NULL;
> +    uint32_t job_size = 0;
> +    qpl_path_t path = qpl_path_hardware;
> +
> +    if (qpl_get_job_size(path, &job_size) != QPL_STS_OK) {
> +        return false;
> +    }
> +    job = g_malloc0(job_size);
> +    if (qpl_init_job(path, job) != QPL_STS_OK) {
> +        g_free(job);
> +        return false;
> +    }
> +    g_free(job);
> +    return true;
> +}
> +
> +/**
> + * multifd_qpl_free_jobs: cleanup jobs
> + *
> + * Free all job resources.
> + *
> + * @qpl: pointer to the QplData structure
> + */
> +static void multifd_qpl_free_jobs(QplData *qpl)
> +{
> +    assert(qpl != NULL);
> +    for (int i = 0; i < qpl->total_job_num; i++) {
> +        qpl_fini_job(qpl->job_array[i]);
> +        g_free(qpl->job_array[i]);
> +        qpl->job_array[i] = NULL;
> +    }
> +    g_free(qpl->job_array);
> +    qpl->job_array = NULL;
> +}
> +
> +/**
> + * multifd_qpl_init_jobs: initialize jobs
> + *
> + * Initialize all jobs
> + *
> + * @qpl: pointer to the QplData structure
> + * @chan_id: multifd channel number
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_init_jobs(QplData *qpl, uint8_t chan_id, Error **errp)
> +{
> +    qpl_path_t path;
> +    qpl_status status;
> +    uint32_t job_size = 0;
> +    qpl_job *job = NULL;
> +
> +    path = qpl->iaa_avail ? qpl_path_hardware : qpl_path_software;
> +    status = qpl_get_job_size(path, &job_size);
> +    if (status != QPL_STS_OK) {
> +        error_setg(errp, "multifd: %u: qpl_get_job_size failed with error %d",
> +                   chan_id, status);
> +        return -1;
> +    }
> +    qpl->job_array = g_new0(qpl_job *, qpl->total_job_num);
> +    for (int i = 0; i < qpl->total_job_num; i++) {
> +        job = g_malloc0(job_size);
> +        status = qpl_init_job(path, job);
> +        if (status != QPL_STS_OK) {
> +            error_setg(errp, "multifd: %u: qpl_init_job failed with error %d",
> +                       chan_id, status);
> +            multifd_qpl_free_jobs(qpl);
> +            return -1;
> +        }
> +        qpl->job_array[i] = job;
> +    }
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_init: initialize QplData structure
> + *
> + * Allocate and initialize a QplData structure
> + *
> + * Returns QplData pointer for success or NULL for error
> + *
> + * @job_num: pointer to the QplData structure
> + * @job_size: the buffer size of the job
> + * @chan_id: multifd channel number
> + * @errp: pointer to an error
> + */
> +static QplData *multifd_qpl_init(uint32_t job_num, uint32_t job_size,
> +                                 uint8_t chan_id, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = g_new0(QplData, 1);
> +    qpl->total_job_num = job_num;
> +    qpl->iaa_avail = check_iaa_avail();
> +    if (multifd_qpl_init_jobs(qpl, chan_id, errp) != 0) {
> +        g_free(qpl);
> +        return NULL;
> +    }
> +    qpl->zbuf = g_malloc0(job_size * job_num);
> +    qpl->zbuf_hdr = g_new0(uint32_t, job_num);
> +    return qpl;
> +}
> +
> +/**
> + * multifd_qpl_deinit: cleanup QplData structure
> + *
> + * Free jobs, comprssed buffers and QplData structure
> + *
> + * @qpl: pointer to the QplData structure
> + */
> +static void multifd_qpl_deinit(QplData *qpl)
> +{
> +    if (qpl != NULL) {
> +        multifd_qpl_free_jobs(qpl);
> +        g_free(qpl->zbuf_hdr);
> +        g_free(qpl->zbuf);
> +        g_free(qpl);
> +    }
> +}
> +
> +/**
> + * multifd_qpl_send_setup: setup send side
> + *
> + * Setup each channel with QPL compression.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> +    if (!qpl) {
> +        return -1;
> +    }
> +    p->compress_data = qpl;
> +
> +    /*
> +     * Each page will be compressed independently and sent using an IOV. The
> +     * additional two IOVs are used to store packet header and compressed data
> +     * length
> +     */
> +    p->iov = g_new0(struct iovec, p->page_count + 2);
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_send_cleanup: cleanup send side
> + *
> + * Close the channel and return memory.
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> +{
> +    multifd_qpl_deinit(p->compress_data);
> +    p->compress_data = NULL;
> +    g_free(p->iov);
> +    p->iov = NULL;
> +}
> +
> +/**
> + * multifd_qpl_send_prepare: prepare data to be able to send
> + *
> + * Create a compressed buffer with all the pages that we are going to
> + * send.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
> +{
> +    /* Implement in next patch */
> +    return -1;
> +}
> +
> +/**
> + * multifd_qpl_recv_setup: setup receive side
> + *
> + * Create the compressed channel and buffer.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
> +{
> +    QplData *qpl;
> +
> +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> +    if (!qpl) {
> +        return -1;
> +    }
> +    p->compress_data = qpl;
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_recv_cleanup: setup receive side
> + *
> + * Close the channel and return memory.
> + *
> + * @p: Params for the channel that we are using
> + */
> +static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> +{
> +    multifd_qpl_deinit(p->compress_data);
> +    p->compress_data = NULL;
> +}
> +
> +/**
> + * multifd_qpl_recv: read the data from the channel into actual pages
> + *
> + * Read the compressed buffer, and uncompress it into the actual
> + * pages.
> + *
> + * Returns 0 for success or -1 for error
> + *
> + * @p: Params for the channel that we are using
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> +{
> +    /* Implement in next patch */
> +    return -1;
> +}
> +
> +static MultiFDMethods multifd_qpl_ops = {
> +    .send_setup = multifd_qpl_send_setup,
> +    .send_cleanup = multifd_qpl_send_cleanup,
> +    .send_prepare = multifd_qpl_send_prepare,
> +    .recv_setup = multifd_qpl_recv_setup,
> +    .recv_cleanup = multifd_qpl_recv_cleanup,
> +    .recv = multifd_qpl_recv,
> +};
>  
>  static void multifd_qpl_register(void)
>  {
> -    /* noop */
> +    multifd_register_ops(MULTIFD_COMPRESSION_QPL, &multifd_qpl_ops);
>  }
>  
>  migration_init(multifd_qpl_register);
Liu, Yuan1 May 11, 2024, 12:39 p.m. UTC | #3
> -----Original Message-----
> From: Fabiano Rosas <farosas@suse.de>
> Sent: Saturday, May 11, 2024 4:53 AM
> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com
> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1.liu@intel.com>; Zou, Nanhai
> <nanhai.zou@intel.com>
> Subject: Re: [PATCH v6 5/7] migration/multifd: implement initialization of
> qpl compression
> 
> Yuan Liu <yuan1.liu@intel.com> writes:
> 
> > the qpl initialization includes memory allocation for compressed
> > data and the qpl job initialization.
> >
> > the qpl job initialization will check if the In-Memory Analytics
> > Accelerator(IAA) device is available and use the IAA device first.
> > If the platform does not have IAA device or the IAA device is not
> > available, the qpl compression will fallback to the software path.
> >
> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> > ---
> >  migration/multifd-qpl.c | 272 +++++++++++++++++++++++++++++++++++++++-
> >  1 file changed, 271 insertions(+), 1 deletion(-)
> >
> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> > index 056a68a060..89fa51091a 100644
> > --- a/migration/multifd-qpl.c
> > +++ b/migration/multifd-qpl.c
> > @@ -9,12 +9,282 @@
> >   * This work is licensed under the terms of the GNU GPL, version 2 or
> later.
> >   * See the COPYING file in the top-level directory.
> >   */
> > +
> >  #include "qemu/osdep.h"
> >  #include "qemu/module.h"
> > +#include "qapi/error.h"
> > +#include "migration.h"
> 
> Where is this used? I think you only need qapi/qapi-types-migration.h

Yes, the qapi/qapi-types-migration.h is enough, no need to include migration.h
I will fix this next version.

> > +#include "multifd.h"
> > +#include "qpl/qpl.h"
> > +
> > +typedef struct {
> > +    qpl_job **job_array;
> > +    /* the number of allocated jobs */
> > +    uint32_t total_job_num;
> > +    /* compressed data buffer */
> > +    uint8_t *zbuf;
> > +    /* the length of compressed data */
> > +    uint32_t *zbuf_hdr;
> > +    /* the status of IAA device */
> > +    bool iaa_avail;
> > +} QplData;
> > +
> > +/**
> > + * check_iaa_avail: check if IAA device is available
> > + *
> > + * If the system does not have an IAA device, the IAA device is
> > + * not enabled or the IAA work queue is not configured as a shared
> > + * mode, the QPL hardware path initialization will fail.
> > + *
> > + * Returns true if IAA device is available, otherwise false.
> > + */
> > +static bool check_iaa_avail(void)
> > +{
> > +    qpl_job *job = NULL;
> > +    uint32_t job_size = 0;
> > +    qpl_path_t path = qpl_path_hardware;
> > +
> > +    if (qpl_get_job_size(path, &job_size) != QPL_STS_OK) {
> > +        return false;
> > +    }
> > +    job = g_malloc0(job_size);
> > +    if (qpl_init_job(path, job) != QPL_STS_OK) {
> > +        g_free(job);
> > +        return false;
> > +    }
> > +    g_free(job);
> > +    return true;
> > +}
> > +
> > +/**
> > + * multifd_qpl_free_jobs: cleanup jobs
> > + *
> > + * Free all job resources.
> > + *
> > + * @qpl: pointer to the QplData structure
> > + */
> > +static void multifd_qpl_free_jobs(QplData *qpl)
> > +{
> > +    assert(qpl != NULL);
> > +    for (int i = 0; i < qpl->total_job_num; i++) {
> > +        qpl_fini_job(qpl->job_array[i]);
> > +        g_free(qpl->job_array[i]);
> > +        qpl->job_array[i] = NULL;
> > +    }
> > +    g_free(qpl->job_array);
> > +    qpl->job_array = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_init_jobs: initialize jobs
> > + *
> > + * Initialize all jobs
> > + *
> > + * @qpl: pointer to the QplData structure
> > + * @chan_id: multifd channel number
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_init_jobs(QplData *qpl, uint8_t chan_id, Error
> **errp)
> > +{
> > +    qpl_path_t path;
> > +    qpl_status status;
> > +    uint32_t job_size = 0;
> > +    qpl_job *job = NULL;
> > +
> > +    path = qpl->iaa_avail ? qpl_path_hardware : qpl_path_software;
> > +    status = qpl_get_job_size(path, &job_size);
> > +    if (status != QPL_STS_OK) {
> > +        error_setg(errp, "multifd: %u: qpl_get_job_size failed with
> error %d",
> > +                   chan_id, status);
> > +        return -1;
> > +    }
> > +    qpl->job_array = g_new0(qpl_job *, qpl->total_job_num);
> > +    for (int i = 0; i < qpl->total_job_num; i++) {
> > +        job = g_malloc0(job_size);
> > +        status = qpl_init_job(path, job);
> > +        if (status != QPL_STS_OK) {
> > +            error_setg(errp, "multifd: %u: qpl_init_job failed with
> error %d",
> > +                       chan_id, status);
> > +            multifd_qpl_free_jobs(qpl);
> > +            return -1;
> > +        }
> > +        qpl->job_array[i] = job;
> > +    }
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_init: initialize QplData structure
> > + *
> > + * Allocate and initialize a QplData structure
> > + *
> > + * Returns QplData pointer for success or NULL for error
> > + *
> > + * @job_num: pointer to the QplData structure
> > + * @job_size: the buffer size of the job
> > + * @chan_id: multifd channel number
> > + * @errp: pointer to an error
> > + */
> > +static QplData *multifd_qpl_init(uint32_t job_num, uint32_t job_size,
> > +                                 uint8_t chan_id, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = g_new0(QplData, 1);
> > +    qpl->total_job_num = job_num;
> > +    qpl->iaa_avail = check_iaa_avail();
> > +    if (multifd_qpl_init_jobs(qpl, chan_id, errp) != 0) {
> > +        g_free(qpl);
> > +        return NULL;
> > +    }
> > +    qpl->zbuf = g_malloc0(job_size * job_num);
> > +    qpl->zbuf_hdr = g_new0(uint32_t, job_num);
> > +    return qpl;
> > +}
> > +
> > +/**
> > + * multifd_qpl_deinit: cleanup QplData structure
> > + *
> > + * Free jobs, comprssed buffers and QplData structure
> > + *
> > + * @qpl: pointer to the QplData structure
> > + */
> > +static void multifd_qpl_deinit(QplData *qpl)
> > +{
> > +    if (qpl != NULL) {
> > +        multifd_qpl_free_jobs(qpl);
> > +        g_free(qpl->zbuf_hdr);
> > +        g_free(qpl->zbuf);
> > +        g_free(qpl);
> > +    }
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_setup: setup send side
> > + *
> > + * Setup each channel with QPL compression.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> > +    if (!qpl) {
> > +        return -1;
> > +    }
> > +    p->compress_data = qpl;
> > +
> > +    /*
> > +     * Each page will be compressed independently and sent using an
> IOV. The
> > +     * additional two IOVs are used to store packet header and
> compressed data
> > +     * length
> > +     */
> > +    p->iov = g_new0(struct iovec, p->page_count + 2);
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_cleanup: cleanup send side
> > + *
> > + * Close the channel and return memory.
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error
> **errp)
> > +{
> > +    multifd_qpl_deinit(p->compress_data);
> > +    p->compress_data = NULL;
> > +    g_free(p->iov);
> > +    p->iov = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_prepare: prepare data to be able to send
> > + *
> > + * Create a compressed buffer with all the pages that we are going to
> > + * send.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
> > +{
> > +    /* Implement in next patch */
> > +    return -1;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv_setup: setup receive side
> > + *
> > + * Create the compressed channel and buffer.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> > +    if (!qpl) {
> > +        return -1;
> > +    }
> > +    p->compress_data = qpl;
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv_cleanup: setup receive side
> > + *
> > + * Close the channel and return memory.
> > + *
> > + * @p: Params for the channel that we are using
> > + */
> > +static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> > +{
> > +    multifd_qpl_deinit(p->compress_data);
> > +    p->compress_data = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv: read the data from the channel into actual pages
> > + *
> > + * Read the compressed buffer, and uncompress it into the actual
> > + * pages.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> > +{
> > +    /* Implement in next patch */
> > +    return -1;
> > +}
> > +
> > +static MultiFDMethods multifd_qpl_ops = {
> > +    .send_setup = multifd_qpl_send_setup,
> > +    .send_cleanup = multifd_qpl_send_cleanup,
> > +    .send_prepare = multifd_qpl_send_prepare,
> > +    .recv_setup = multifd_qpl_recv_setup,
> > +    .recv_cleanup = multifd_qpl_recv_cleanup,
> > +    .recv = multifd_qpl_recv,
> > +};
> >
> >  static void multifd_qpl_register(void)
> >  {
> > -    /* noop */
> > +    multifd_register_ops(MULTIFD_COMPRESSION_QPL, &multifd_qpl_ops);
> >  }
> >
> >  migration_init(multifd_qpl_register);
Liu, Yuan1 May 11, 2024, 12:55 p.m. UTC | #4
> -----Original Message-----
> From: Fabiano Rosas <farosas@suse.de>
> Sent: Saturday, May 11, 2024 4:45 AM
> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com
> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1.liu@intel.com>; Zou, Nanhai
> <nanhai.zou@intel.com>
> Subject: Re: [PATCH v6 5/7] migration/multifd: implement initialization of
> qpl compression
> 
> Yuan Liu <yuan1.liu@intel.com> writes:
> 
> > the qpl initialization includes memory allocation for compressed
> > data and the qpl job initialization.
> >
> > the qpl job initialization will check if the In-Memory Analytics
> > Accelerator(IAA) device is available and use the IAA device first.
> > If the platform does not have IAA device or the IAA device is not
> > available, the qpl compression will fallback to the software path.
> >
> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> 
> Looks good, just some nits below.
> 
> > ---
> >  migration/multifd-qpl.c | 272 +++++++++++++++++++++++++++++++++++++++-
> >  1 file changed, 271 insertions(+), 1 deletion(-)
> >
> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> > index 056a68a060..89fa51091a 100644
> > --- a/migration/multifd-qpl.c
> > +++ b/migration/multifd-qpl.c
> > @@ -9,12 +9,282 @@
> >   * This work is licensed under the terms of the GNU GPL, version 2 or
> later.
> >   * See the COPYING file in the top-level directory.
> >   */
> > +
> >  #include "qemu/osdep.h"
> >  #include "qemu/module.h"
> > +#include "qapi/error.h"
> > +#include "migration.h"
> > +#include "multifd.h"
> > +#include "qpl/qpl.h"
> > +
> > +typedef struct {
> > +    qpl_job **job_array;
> > +    /* the number of allocated jobs */
> > +    uint32_t total_job_num;
> > +    /* compressed data buffer */
> > +    uint8_t *zbuf;
> > +    /* the length of compressed data */
> 
> array of lenghts

Thanks for the comment, I will improve the comment next version.

> > +    uint32_t *zbuf_hdr;
> 
> Why the _hdr suffix if the lengths are the only data stored here?

This zbuf_hdr is confusing, I will use lens instead of zbuf_hdr next version

> > +    /* the status of IAA device */
> > +    bool iaa_avail;
> > +} QplData;
> > +
> > +/**
> > + * check_iaa_avail: check if IAA device is available
> > + *
> > + * If the system does not have an IAA device, the IAA device is
> > + * not enabled or the IAA work queue is not configured as a shared
> > + * mode, the QPL hardware path initialization will fail.
> > + *
> > + * Returns true if IAA device is available, otherwise false.
> > + */
> > +static bool check_iaa_avail(void)
> > +{
> > +    qpl_job *job = NULL;
> > +    uint32_t job_size = 0;
> > +    qpl_path_t path = qpl_path_hardware;
> > +
> > +    if (qpl_get_job_size(path, &job_size) != QPL_STS_OK) {
> > +        return false;
> > +    }
> > +    job = g_malloc0(job_size);
> > +    if (qpl_init_job(path, job) != QPL_STS_OK) {
> > +        g_free(job);
> > +        return false;
> > +    }
> > +    g_free(job);
> > +    return true;
> > +}
> > +
> > +/**
> > + * multifd_qpl_free_jobs: cleanup jobs
> > + *
> > + * Free all job resources.
> > + *
> > + * @qpl: pointer to the QplData structure
> > + */
> > +static void multifd_qpl_free_jobs(QplData *qpl)
> > +{
> > +    assert(qpl != NULL);
> > +    for (int i = 0; i < qpl->total_job_num; i++) {
> > +        qpl_fini_job(qpl->job_array[i]);
> > +        g_free(qpl->job_array[i]);
> > +        qpl->job_array[i] = NULL;
> > +    }
> > +    g_free(qpl->job_array);
> > +    qpl->job_array = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_init_jobs: initialize jobs
> > + *
> > + * Initialize all jobs
> > + *
> > + * @qpl: pointer to the QplData structure
> > + * @chan_id: multifd channel number
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_init_jobs(QplData *qpl, uint8_t chan_id, Error
> **errp)
> > +{
> > +    qpl_path_t path;
> > +    qpl_status status;
> > +    uint32_t job_size = 0;
> > +    qpl_job *job = NULL;
> > +
> > +    path = qpl->iaa_avail ? qpl_path_hardware : qpl_path_software;
> > +    status = qpl_get_job_size(path, &job_size);
> > +    if (status != QPL_STS_OK) {
> > +        error_setg(errp, "multifd: %u: qpl_get_job_size failed with
> error %d",
> > +                   chan_id, status);
> > +        return -1;
> > +    }
> > +    qpl->job_array = g_new0(qpl_job *, qpl->total_job_num);
> > +    for (int i = 0; i < qpl->total_job_num; i++) {
> > +        job = g_malloc0(job_size);
> > +        status = qpl_init_job(path, job);
> > +        if (status != QPL_STS_OK) {
> > +            error_setg(errp, "multifd: %u: qpl_init_job failed with
> error %d",
> > +                       chan_id, status);
> > +            multifd_qpl_free_jobs(qpl);
> > +            return -1;
> > +        }
> > +        qpl->job_array[i] = job;
> > +    }
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_init: initialize QplData structure
> > + *
> > + * Allocate and initialize a QplData structure
> > + *
> > + * Returns QplData pointer for success or NULL for error
> > + *
> > + * @job_num: pointer to the QplData structure
> 
> This needs updating^

Yes, thanks for the comment, I will fix it next version

> > + * @job_size: the buffer size of the job
> > + * @chan_id: multifd channel number
> > + * @errp: pointer to an error
> > + */
> > +static QplData *multifd_qpl_init(uint32_t job_num, uint32_t job_size,
> > +                                 uint8_t chan_id, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = g_new0(QplData, 1);
> > +    qpl->total_job_num = job_num;
> > +    qpl->iaa_avail = check_iaa_avail();
> > +    if (multifd_qpl_init_jobs(qpl, chan_id, errp) != 0) {
> > +        g_free(qpl);
> > +        return NULL;
> > +    }
> > +    qpl->zbuf = g_malloc0(job_size * job_num);
> > +    qpl->zbuf_hdr = g_new0(uint32_t, job_num);
> > +    return qpl;
> > +}
> > +
> > +/**
> > + * multifd_qpl_deinit: cleanup QplData structure
> > + *
> > + * Free jobs, comprssed buffers and QplData structure
> 
> compressed

Thanks for the comment, I will fix the typo next version

> > + *
> > + * @qpl: pointer to the QplData structure
> > + */
> > +static void multifd_qpl_deinit(QplData *qpl)
> > +{
> > +    if (qpl != NULL) {
> > +        multifd_qpl_free_jobs(qpl);
> > +        g_free(qpl->zbuf_hdr);
> > +        g_free(qpl->zbuf);
> > +        g_free(qpl);
> > +    }
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_setup: setup send side
> > + *
> > + * Setup each channel with QPL compression.
> 
> s/each/the/

Sure, I will refine the comment next version

> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> > +    if (!qpl) {
> > +        return -1;
> > +    }
> > +    p->compress_data = qpl;
> > +
> > +    /*
> > +     * Each page will be compressed independently and sent using an
> IOV. The
> > +     * additional two IOVs are used to store packet header and
> compressed data
> > +     * length
> > +     */
> > +    p->iov = g_new0(struct iovec, p->page_count + 2);
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_cleanup: cleanup send side
> > + *
> > + * Close the channel and return memory.
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error
> **errp)
> > +{
> > +    multifd_qpl_deinit(p->compress_data);
> > +    p->compress_data = NULL;
> > +    g_free(p->iov);
> > +    p->iov = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_send_prepare: prepare data to be able to send
> > + *
> > + * Create a compressed buffer with all the pages that we are going to
> > + * send.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
> > +{
> > +    /* Implement in next patch */
> > +    return -1;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv_setup: setup receive side
> > + *
> > + * Create the compressed channel and buffer.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
> > +{
> > +    QplData *qpl;
> > +
> > +    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
> > +    if (!qpl) {
> > +        return -1;
> > +    }
> > +    p->compress_data = qpl;
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv_cleanup: setup receive side
> > + *
> > + * Close the channel and return memory.
> > + *
> > + * @p: Params for the channel that we are using
> > + */
> > +static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> > +{
> > +    multifd_qpl_deinit(p->compress_data);
> > +    p->compress_data = NULL;
> > +}
> > +
> > +/**
> > + * multifd_qpl_recv: read the data from the channel into actual pages
> > + *
> > + * Read the compressed buffer, and uncompress it into the actual
> > + * pages.
> > + *
> > + * Returns 0 for success or -1 for error
> > + *
> > + * @p: Params for the channel that we are using
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> > +{
> > +    /* Implement in next patch */
> > +    return -1;
> > +}
> > +
> > +static MultiFDMethods multifd_qpl_ops = {
> > +    .send_setup = multifd_qpl_send_setup,
> > +    .send_cleanup = multifd_qpl_send_cleanup,
> > +    .send_prepare = multifd_qpl_send_prepare,
> > +    .recv_setup = multifd_qpl_recv_setup,
> > +    .recv_cleanup = multifd_qpl_recv_cleanup,
> > +    .recv = multifd_qpl_recv,
> > +};
> >
> >  static void multifd_qpl_register(void)
> >  {
> > -    /* noop */
> > +    multifd_register_ops(MULTIFD_COMPRESSION_QPL, &multifd_qpl_ops);
> >  }
> >
> >  migration_init(multifd_qpl_register);
diff mbox series

Patch

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 056a68a060..89fa51091a 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -9,12 +9,282 @@ 
  * This work is licensed under the terms of the GNU GPL, version 2 or later.
  * See the COPYING file in the top-level directory.
  */
+
 #include "qemu/osdep.h"
 #include "qemu/module.h"
+#include "qapi/error.h"
+#include "migration.h"
+#include "multifd.h"
+#include "qpl/qpl.h"
+
+typedef struct {
+    qpl_job **job_array;
+    /* the number of allocated jobs */
+    uint32_t total_job_num;
+    /* compressed data buffer */
+    uint8_t *zbuf;
+    /* the length of compressed data */
+    uint32_t *zbuf_hdr;
+    /* the status of IAA device */
+    bool iaa_avail;
+} QplData;
+
+/**
+ * check_iaa_avail: check if IAA device is available
+ *
+ * If the system does not have an IAA device, the IAA device is
+ * not enabled or the IAA work queue is not configured as a shared
+ * mode, the QPL hardware path initialization will fail.
+ *
+ * Returns true if IAA device is available, otherwise false.
+ */
+static bool check_iaa_avail(void)
+{
+    qpl_job *job = NULL;
+    uint32_t job_size = 0;
+    qpl_path_t path = qpl_path_hardware;
+
+    if (qpl_get_job_size(path, &job_size) != QPL_STS_OK) {
+        return false;
+    }
+    job = g_malloc0(job_size);
+    if (qpl_init_job(path, job) != QPL_STS_OK) {
+        g_free(job);
+        return false;
+    }
+    g_free(job);
+    return true;
+}
+
+/**
+ * multifd_qpl_free_jobs: cleanup jobs
+ *
+ * Free all job resources.
+ *
+ * @qpl: pointer to the QplData structure
+ */
+static void multifd_qpl_free_jobs(QplData *qpl)
+{
+    assert(qpl != NULL);
+    for (int i = 0; i < qpl->total_job_num; i++) {
+        qpl_fini_job(qpl->job_array[i]);
+        g_free(qpl->job_array[i]);
+        qpl->job_array[i] = NULL;
+    }
+    g_free(qpl->job_array);
+    qpl->job_array = NULL;
+}
+
+/**
+ * multifd_qpl_init_jobs: initialize jobs
+ *
+ * Initialize all jobs
+ *
+ * @qpl: pointer to the QplData structure
+ * @chan_id: multifd channel number
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_init_jobs(QplData *qpl, uint8_t chan_id, Error **errp)
+{
+    qpl_path_t path;
+    qpl_status status;
+    uint32_t job_size = 0;
+    qpl_job *job = NULL;
+
+    path = qpl->iaa_avail ? qpl_path_hardware : qpl_path_software;
+    status = qpl_get_job_size(path, &job_size);
+    if (status != QPL_STS_OK) {
+        error_setg(errp, "multifd: %u: qpl_get_job_size failed with error %d",
+                   chan_id, status);
+        return -1;
+    }
+    qpl->job_array = g_new0(qpl_job *, qpl->total_job_num);
+    for (int i = 0; i < qpl->total_job_num; i++) {
+        job = g_malloc0(job_size);
+        status = qpl_init_job(path, job);
+        if (status != QPL_STS_OK) {
+            error_setg(errp, "multifd: %u: qpl_init_job failed with error %d",
+                       chan_id, status);
+            multifd_qpl_free_jobs(qpl);
+            return -1;
+        }
+        qpl->job_array[i] = job;
+    }
+    return 0;
+}
+
+/**
+ * multifd_qpl_init: initialize QplData structure
+ *
+ * Allocate and initialize a QplData structure
+ *
+ * Returns QplData pointer for success or NULL for error
+ *
+ * @job_num: pointer to the QplData structure
+ * @job_size: the buffer size of the job
+ * @chan_id: multifd channel number
+ * @errp: pointer to an error
+ */
+static QplData *multifd_qpl_init(uint32_t job_num, uint32_t job_size,
+                                 uint8_t chan_id, Error **errp)
+{
+    QplData *qpl;
+
+    qpl = g_new0(QplData, 1);
+    qpl->total_job_num = job_num;
+    qpl->iaa_avail = check_iaa_avail();
+    if (multifd_qpl_init_jobs(qpl, chan_id, errp) != 0) {
+        g_free(qpl);
+        return NULL;
+    }
+    qpl->zbuf = g_malloc0(job_size * job_num);
+    qpl->zbuf_hdr = g_new0(uint32_t, job_num);
+    return qpl;
+}
+
+/**
+ * multifd_qpl_deinit: cleanup QplData structure
+ *
+ * Free jobs, comprssed buffers and QplData structure
+ *
+ * @qpl: pointer to the QplData structure
+ */
+static void multifd_qpl_deinit(QplData *qpl)
+{
+    if (qpl != NULL) {
+        multifd_qpl_free_jobs(qpl);
+        g_free(qpl->zbuf_hdr);
+        g_free(qpl->zbuf);
+        g_free(qpl);
+    }
+}
+
+/**
+ * multifd_qpl_send_setup: setup send side
+ *
+ * Setup each channel with QPL compression.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_send_setup(MultiFDSendParams *p, Error **errp)
+{
+    QplData *qpl;
+
+    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
+    if (!qpl) {
+        return -1;
+    }
+    p->compress_data = qpl;
+
+    /*
+     * Each page will be compressed independently and sent using an IOV. The
+     * additional two IOVs are used to store packet header and compressed data
+     * length
+     */
+    p->iov = g_new0(struct iovec, p->page_count + 2);
+    return 0;
+}
+
+/**
+ * multifd_qpl_send_cleanup: cleanup send side
+ *
+ * Close the channel and return memory.
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
+{
+    multifd_qpl_deinit(p->compress_data);
+    p->compress_data = NULL;
+    g_free(p->iov);
+    p->iov = NULL;
+}
+
+/**
+ * multifd_qpl_send_prepare: prepare data to be able to send
+ *
+ * Create a compressed buffer with all the pages that we are going to
+ * send.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
+{
+    /* Implement in next patch */
+    return -1;
+}
+
+/**
+ * multifd_qpl_recv_setup: setup receive side
+ *
+ * Create the compressed channel and buffer.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_recv_setup(MultiFDRecvParams *p, Error **errp)
+{
+    QplData *qpl;
+
+    qpl = multifd_qpl_init(p->page_count, p->page_size, p->id, errp);
+    if (!qpl) {
+        return -1;
+    }
+    p->compress_data = qpl;
+    return 0;
+}
+
+/**
+ * multifd_qpl_recv_cleanup: setup receive side
+ *
+ * Close the channel and return memory.
+ *
+ * @p: Params for the channel that we are using
+ */
+static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
+{
+    multifd_qpl_deinit(p->compress_data);
+    p->compress_data = NULL;
+}
+
+/**
+ * multifd_qpl_recv: read the data from the channel into actual pages
+ *
+ * Read the compressed buffer, and uncompress it into the actual
+ * pages.
+ *
+ * Returns 0 for success or -1 for error
+ *
+ * @p: Params for the channel that we are using
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
+{
+    /* Implement in next patch */
+    return -1;
+}
+
+static MultiFDMethods multifd_qpl_ops = {
+    .send_setup = multifd_qpl_send_setup,
+    .send_cleanup = multifd_qpl_send_cleanup,
+    .send_prepare = multifd_qpl_send_prepare,
+    .recv_setup = multifd_qpl_recv_setup,
+    .recv_cleanup = multifd_qpl_recv_cleanup,
+    .recv = multifd_qpl_recv,
+};
 
 static void multifd_qpl_register(void)
 {
-    /* noop */
+    multifd_register_ops(MULTIFD_COMPRESSION_QPL, &multifd_qpl_ops);
 }
 
 migration_init(multifd_qpl_register);