diff mbox series

[v2,09/20] util/dsa: Implement DSA task asynchronous completion thread model.

Message ID 20231114054032.1192027-10-hao.xiang@bytedance.com (mailing list archive)
State New, archived
Headers show
Series Use Intel DSA accelerator to offload zero page checking in multifd live migration. | expand

Commit Message

Hao Xiang Nov. 14, 2023, 5:40 a.m. UTC
* Create a dedicated thread for DSA task completion.
* DSA completion thread runs a loop and poll for completed tasks.
* Start and stop DSA completion thread during DSA device start stop.

User space application can directly submit task to Intel DSA
accelerator by writing to DSA's device memory (mapped in user space).
Once a task is submitted, the device starts processing it and write
the completion status back to the task. A user space application can
poll the task's completion status to check for completion. This change
uses a dedicated thread to perform DSA task completion checking.

Signed-off-by: Hao Xiang <hao.xiang@bytedance.com>
---
 util/dsa.c | 243 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 242 insertions(+), 1 deletion(-)

Comments

Fabiano Rosas Dec. 12, 2023, 7:36 p.m. UTC | #1
Hao Xiang <hao.xiang@bytedance.com> writes:

> * Create a dedicated thread for DSA task completion.
> * DSA completion thread runs a loop and poll for completed tasks.
> * Start and stop DSA completion thread during DSA device start stop.
>
> User space application can directly submit task to Intel DSA
> accelerator by writing to DSA's device memory (mapped in user space).
> Once a task is submitted, the device starts processing it and write
> the completion status back to the task. A user space application can
> poll the task's completion status to check for completion. This change
> uses a dedicated thread to perform DSA task completion checking.
>
> Signed-off-by: Hao Xiang <hao.xiang@bytedance.com>
> ---
>  util/dsa.c | 243 ++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 242 insertions(+), 1 deletion(-)
>
> diff --git a/util/dsa.c b/util/dsa.c
> index f82282ce99..0e68013ffb 100644
> --- a/util/dsa.c
> +++ b/util/dsa.c
> @@ -44,6 +44,7 @@
>  
>  #define DSA_WQ_SIZE 4096
>  #define MAX_DSA_DEVICES 16
> +#define DSA_COMPLETION_THREAD "dsa_completion"
>  
>  typedef QSIMPLEQ_HEAD(dsa_task_queue, buffer_zero_batch_task) dsa_task_queue;
>  
> @@ -61,8 +62,18 @@ struct dsa_device_group {
>      dsa_task_queue task_queue;
>  };
>  
> +struct dsa_completion_thread {
> +    bool stopping;
> +    bool running;
> +    QemuThread thread;
> +    int thread_id;
> +    QemuSemaphore sem_init_done;
> +    struct dsa_device_group *group;
> +};
> +
>  uint64_t max_retry_count;
>  static struct dsa_device_group dsa_group;
> +static struct dsa_completion_thread completion_thread;
>  
>  
>  /**
> @@ -439,6 +450,234 @@ submit_batch_wi_async(struct buffer_zero_batch_task *batch_task)
>      return dsa_task_enqueue(device_group, batch_task);
>  }
>  
> +/**
> + * @brief Poll for the DSA work item completion.
> + *
> + * @param completion A pointer to the DSA work item completion record.
> + * @param opcode The DSA opcode.
> + *
> + * @return Zero if successful, non-zero otherwise.
> + */
> +static int
> +poll_completion(struct dsa_completion_record *completion,
> +                enum dsa_opcode opcode)
> +{
> +    uint8_t status;
> +    uint64_t retry = 0;
> +
> +    while (true) {
> +        // The DSA operation completes successfully or fails.
> +        status = completion->status;
> +        if (status == DSA_COMP_SUCCESS ||

Should we read directly from completion->status or is the compiler smart
enough to not optimize 'status' out?

> +            status == DSA_COMP_PAGE_FAULT_NOBOF ||
> +            status == DSA_COMP_BATCH_PAGE_FAULT ||
> +            status == DSA_COMP_BATCH_FAIL) {
> +            break;
> +        } else if (status != DSA_COMP_NONE) {
> +            /* TODO: Error handling here on unexpected failure. */

Let's make sure this is dealt with before merging.

> +            fprintf(stderr, "DSA opcode %d failed with status = %d.\n",
> +                    opcode, status);
> +            exit(1);

return instead of exiting.

> +        }
> +        retry++;
> +        if (retry > max_retry_count) {
> +            fprintf(stderr, "Wait for completion retry %lu times.\n", retry);
> +            exit(1);

same here

> +        }
> +        _mm_pause();
> +    }
> +
> +    return 0;
> +}
> +
> +/**
> + * @brief Complete a single DSA task in the batch task.
> + *
> + * @param task A pointer to the batch task structure.
> + */
> +static void
> +poll_task_completion(struct buffer_zero_batch_task *task)
> +{
> +    assert(task->task_type == DSA_TASK);
> +
> +    struct dsa_completion_record *completion = &task->completions[0];
> +    uint8_t status;
> +
> +    poll_completion(completion, task->descriptors[0].opcode);
> +
> +    status = completion->status;
> +    if (status == DSA_COMP_SUCCESS) {
> +        task->results[0] = (completion->result == 0);
> +        return;
> +    }
> +
> +    assert(status == DSA_COMP_PAGE_FAULT_NOBOF);
> +}
> +
> +/**
> + * @brief Poll a batch task status until it completes. If DSA task doesn't
> + *        complete properly, use CPU to complete the task.
> + *
> + * @param batch_task A pointer to the DSA batch task.
> + */
> +static void
> +poll_batch_task_completion(struct buffer_zero_batch_task *batch_task)
> +{
> +    struct dsa_completion_record *batch_completion = &batch_task->batch_completion;
> +    struct dsa_completion_record *completion;
> +    uint8_t batch_status;
> +    uint8_t status;
> +    bool *results = batch_task->results;
> +    uint32_t count = batch_task->batch_descriptor.desc_count;
> +
> +    poll_completion(batch_completion,
> +                    batch_task->batch_descriptor.opcode);
> +
> +    batch_status = batch_completion->status;
> +
> +    if (batch_status == DSA_COMP_SUCCESS) {
> +        if (batch_completion->bytes_completed == count) {
> +            // Let's skip checking for each descriptors' completion status
> +            // if the batch descriptor says all succedded.
> +            for (int i = 0; i < count; i++) {
> +                assert(batch_task->completions[i].status == DSA_COMP_SUCCESS);
> +                results[i] = (batch_task->completions[i].result == 0);
> +            }
> +            return;
> +        }
> +    } else {
> +        assert(batch_status == DSA_COMP_BATCH_FAIL ||
> +            batch_status == DSA_COMP_BATCH_PAGE_FAULT);
> +    }
> +
> +    for (int i = 0; i < count; i++) {
> +

extra whitespace

> +        completion = &batch_task->completions[i];
> +        status = completion->status;
> +
> +        if (status == DSA_COMP_SUCCESS) {
> +            results[i] = (completion->result == 0);
> +            continue;
> +        }
> +
> +        if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
> +            fprintf(stderr,
> +                    "Unexpected completion status = %u.\n", status);
> +            assert(false);

return here

> +        }
> +    }
> +}
> +
> +/**
> + * @brief Handles an asynchronous DSA batch task completion.
> + *
> + * @param task A pointer to the batch buffer zero task structure.
> + */
> +static void
> +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
> +{
> +    batch_task->status = DSA_TASK_COMPLETION;
> +    batch_task->completion_callback(batch_task);
> +}
> +
> +/**
> + * @brief The function entry point called by a dedicated DSA
> + *        work item completion thread.
> + *
> + * @param opaque A pointer to the thread context.
> + *
> + * @return void* Not used.
> + */
> +static void *
> +dsa_completion_loop(void *opaque)
> +{
> +    struct dsa_completion_thread *thread_context =
> +        (struct dsa_completion_thread *)opaque;
> +    struct buffer_zero_batch_task *batch_task;
> +    struct dsa_device_group *group = thread_context->group;
> +
> +    rcu_register_thread();
> +
> +    thread_context->thread_id = qemu_get_thread_id();
> +    qemu_sem_post(&thread_context->sem_init_done);
> +
> +    while (thread_context->running) {
> +        batch_task = dsa_task_dequeue(group);
> +        assert(batch_task != NULL || !group->running);
> +        if (!group->running) {
> +            assert(!thread_context->running);

This is racy if the compiler reorders "thread_context->running = false"
and "group->running = false". I'd put this under the task_queue_lock or
add a compiler barrier at dsa_completion_thread_stop().

> +            break;
> +        }
> +        if (batch_task->task_type == DSA_TASK) {
> +            poll_task_completion(batch_task);
> +        } else {
> +            assert(batch_task->task_type == DSA_BATCH_TASK);
> +            poll_batch_task_completion(batch_task);
> +        }
> +
> +        dsa_batch_task_complete(batch_task);
> +    }
> +
> +    rcu_unregister_thread();
> +    return NULL;
> +}
> +
> +/**
> + * @brief Initializes a DSA completion thread.
> + *
> + * @param completion_thread A pointer to the completion thread context.
> + * @param group A pointer to the DSA device group.
> + */
> +static void
> +dsa_completion_thread_init(
> +    struct dsa_completion_thread *completion_thread,
> +    struct dsa_device_group *group)
> +{
> +    completion_thread->stopping = false;
> +    completion_thread->running = true;
> +    completion_thread->thread_id = -1;
> +    qemu_sem_init(&completion_thread->sem_init_done, 0);
> +    completion_thread->group = group;
> +
> +    qemu_thread_create(&completion_thread->thread,
> +                       DSA_COMPLETION_THREAD,
> +                       dsa_completion_loop,
> +                       completion_thread,
> +                       QEMU_THREAD_JOINABLE);
> +
> +    /* Wait for initialization to complete */
> +    while (completion_thread->thread_id == -1) {
> +        qemu_sem_wait(&completion_thread->sem_init_done);
> +    }

This is racy, the thread can set 'thread_id' before this enters the loop
and the semaphore will be left unmatched. Not a huge deal but it might
cause confusion when debugging the initialization.

> +}
> +
> +/**
> + * @brief Stops the completion thread (and implicitly, the device group).
> + *
> + * @param opaque A pointer to the completion thread.
> + */
> +static void dsa_completion_thread_stop(void *opaque)
> +{
> +    struct dsa_completion_thread *thread_context =
> +        (struct dsa_completion_thread *)opaque;
> +
> +    struct dsa_device_group *group = thread_context->group;
> +
> +    qemu_mutex_lock(&group->task_queue_lock);
> +
> +    thread_context->stopping = true;
> +    thread_context->running = false;
> +
> +    dsa_device_group_stop(group);
> +
> +    qemu_cond_signal(&group->task_queue_cond);
> +    qemu_mutex_unlock(&group->task_queue_lock);
> +
> +    qemu_thread_join(&thread_context->thread);
> +
> +    qemu_sem_destroy(&thread_context->sem_init_done);
> +}
> +
>  /**
>   * @brief Check if DSA is running.
>   *
> @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task *batch_task)
>   */
>  bool dsa_is_running(void)
>  {
> -    return false;
> +    return completion_thread.running;
>  }
>  
>  static void
> @@ -481,6 +720,7 @@ void dsa_start(void)
>          return;
>      }
>      dsa_device_group_start(&dsa_group);
> +    dsa_completion_thread_init(&completion_thread, &dsa_group);
>  }
>  
>  /**
> @@ -496,6 +736,7 @@ void dsa_stop(void)
>          return;
>      }
>  
> +    dsa_completion_thread_stop(&completion_thread);
>      dsa_empty_task_queue(group);
>  }
Wang, Lei Dec. 18, 2023, 3:11 a.m. UTC | #2
On 11/14/2023 13:40, Hao Xiang wrote:> * Create a dedicated thread for DSA task
completion.
> * DSA completion thread runs a loop and poll for completed tasks.
> * Start and stop DSA completion thread during DSA device start stop.
> 
> User space application can directly submit task to Intel DSA
> accelerator by writing to DSA's device memory (mapped in user space).

> +            }
> +            return;
> +        }
> +    } else {
> +        assert(batch_status == DSA_COMP_BATCH_FAIL ||
> +            batch_status == DSA_COMP_BATCH_PAGE_FAULT);

Nit: indentation is broken here.

> +    }
> +
> +    for (int i = 0; i < count; i++) {
> +
> +        completion = &batch_task->completions[i];
> +        status = completion->status;
> +
> +        if (status == DSA_COMP_SUCCESS) {
> +            results[i] = (completion->result == 0);
> +            continue;
> +        }
> +
> +        if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
> +            fprintf(stderr,
> +                    "Unexpected completion status = %u.\n", status);
> +            assert(false);
> +        }
> +    }
> +}
> +
> +/**
> + * @brief Handles an asynchronous DSA batch task completion.
> + *
> + * @param task A pointer to the batch buffer zero task structure.
> + */
> +static void
> +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
> +{
> +    batch_task->status = DSA_TASK_COMPLETION;
> +    batch_task->completion_callback(batch_task);
> +}
> +
> +/**
> + * @brief The function entry point called by a dedicated DSA
> + *        work item completion thread.
> + *
> + * @param opaque A pointer to the thread context.
> + *
> + * @return void* Not used.
> + */
> +static void *
> +dsa_completion_loop(void *opaque)

Per my understanding, if a multifd sending thread corresponds to a DSA device,
then the batch tasks are executed in parallel which means a task may be
completed slower than another even if this task is enqueued earlier than it. If
we poll on the slower task first it will block the handling of the faster one,
even if the zero checking task for that thread is finished and it can go ahead
and send the data to the wire, this may lower the network resource utilization.

> +{
> +    struct dsa_completion_thread *thread_context =
> +        (struct dsa_completion_thread *)opaque;
> +    struct buffer_zero_batch_task *batch_task;
> +    struct dsa_device_group *group = thread_context->group;
> +
> +    rcu_register_thread();
> +
> +    thread_context->thread_id = qemu_get_thread_id();
> +    qemu_sem_post(&thread_context->sem_init_done);
> +
> +    while (thread_context->running) {
> +        batch_task = dsa_task_dequeue(group);
> +        assert(batch_task != NULL || !group->running);
> +        if (!group->running) {
> +            assert(!thread_context->running);
> +            break;
> +        }
> +        if (batch_task->task_type == DSA_TASK) {
> +            poll_task_completion(batch_task);
> +        } else {
> +            assert(batch_task->task_type == DSA_BATCH_TASK);
> +            poll_batch_task_completion(batch_task);
> +        }
> +
> +        dsa_batch_task_complete(batch_task);
> +    }
> +
> +    rcu_unregister_thread();
> +    return NULL;
> +}
> +
> +/**
> + * @brief Initializes a DSA completion thread.
> + *
> + * @param completion_thread A pointer to the completion thread context.
> + * @param group A pointer to the DSA device group.
> + */
> +static void
> +dsa_completion_thread_init(
> +    struct dsa_completion_thread *completion_thread,
> +    struct dsa_device_group *group)
> +{
> +    completion_thread->stopping = false;
> +    completion_thread->running = true;
> +    completion_thread->thread_id = -1;
> +    qemu_sem_init(&completion_thread->sem_init_done, 0);
> +    completion_thread->group = group;
> +
> +    qemu_thread_create(&completion_thread->thread,
> +                       DSA_COMPLETION_THREAD,
> +                       dsa_completion_loop,
> +                       completion_thread,
> +                       QEMU_THREAD_JOINABLE);
> +
> +    /* Wait for initialization to complete */
> +    while (completion_thread->thread_id == -1) {
> +        qemu_sem_wait(&completion_thread->sem_init_done);
> +    }
> +}
> +
> +/**
> + * @brief Stops the completion thread (and implicitly, the device group).
> + *
> + * @param opaque A pointer to the completion thread.
> + */
> +static void dsa_completion_thread_stop(void *opaque)
> +{
> +    struct dsa_completion_thread *thread_context =
> +        (struct dsa_completion_thread *)opaque;
> +
> +    struct dsa_device_group *group = thread_context->group;
> +
> +    qemu_mutex_lock(&group->task_queue_lock);
> +
> +    thread_context->stopping = true;
> +    thread_context->running = false;
> +
> +    dsa_device_group_stop(group);
> +
> +    qemu_cond_signal(&group->task_queue_cond);
> +    qemu_mutex_unlock(&group->task_queue_lock);
> +
> +    qemu_thread_join(&thread_context->thread);
> +
> +    qemu_sem_destroy(&thread_context->sem_init_done);
> +}
> +
>  /**
>   * @brief Check if DSA is running.
>   *
> @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task *batch_task)
>   */
>  bool dsa_is_running(void)
>  {
> -    return false;
> +    return completion_thread.running;
>  }
>  
>  static void
> @@ -481,6 +720,7 @@ void dsa_start(void)
>          return;
>      }
>      dsa_device_group_start(&dsa_group);
> +    dsa_completion_thread_init(&completion_thread, &dsa_group);
>  }
>  
>  /**
> @@ -496,6 +736,7 @@ void dsa_stop(void)
>          return;
>      }
>  
> +    dsa_completion_thread_stop(&completion_thread);
>      dsa_empty_task_queue(group);
>  }
>
Hao Xiang Dec. 18, 2023, 6:57 p.m. UTC | #3
On Sun, Dec 17, 2023 at 7:11 PM Wang, Lei <lei4.wang@intel.com> wrote:
>
> On 11/14/2023 13:40, Hao Xiang wrote:> * Create a dedicated thread for DSA task
> completion.
> > * DSA completion thread runs a loop and poll for completed tasks.
> > * Start and stop DSA completion thread during DSA device start stop.
> >
> > User space application can directly submit task to Intel DSA
> > accelerator by writing to DSA's device memory (mapped in user space).
>
> > +            }
> > +            return;
> > +        }
> > +    } else {
> > +        assert(batch_status == DSA_COMP_BATCH_FAIL ||
> > +            batch_status == DSA_COMP_BATCH_PAGE_FAULT);
>
> Nit: indentation is broken here.
>
> > +    }
> > +
> > +    for (int i = 0; i < count; i++) {
> > +
> > +        completion = &batch_task->completions[i];
> > +        status = completion->status;
> > +
> > +        if (status == DSA_COMP_SUCCESS) {
> > +            results[i] = (completion->result == 0);
> > +            continue;
> > +        }
> > +
> > +        if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
> > +            fprintf(stderr,
> > +                    "Unexpected completion status = %u.\n", status);
> > +            assert(false);
> > +        }
> > +    }
> > +}
> > +
> > +/**
> > + * @brief Handles an asynchronous DSA batch task completion.
> > + *
> > + * @param task A pointer to the batch buffer zero task structure.
> > + */
> > +static void
> > +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
> > +{
> > +    batch_task->status = DSA_TASK_COMPLETION;
> > +    batch_task->completion_callback(batch_task);
> > +}
> > +
> > +/**
> > + * @brief The function entry point called by a dedicated DSA
> > + *        work item completion thread.
> > + *
> > + * @param opaque A pointer to the thread context.
> > + *
> > + * @return void* Not used.
> > + */
> > +static void *
> > +dsa_completion_loop(void *opaque)
>
> Per my understanding, if a multifd sending thread corresponds to a DSA device,
> then the batch tasks are executed in parallel which means a task may be
> completed slower than another even if this task is enqueued earlier than it. If
> we poll on the slower task first it will block the handling of the faster one,
> even if the zero checking task for that thread is finished and it can go ahead
> and send the data to the wire, this may lower the network resource utilization.
>

Hi Lei, thanks for reviewing. You are correct that we can keep pulling
a task enqueued first while others in the queue have already been
completed. In fact, only one DSA completion thread (pulling thread) is
used here even when multiple DSA devices are used. The pulling loop is
the most CPU intensive activity in the DSA workflow and that acts
directly against the goal of saving CPU usage. The trade-off I want to
take here is a slightly higher latency on DSA task completion but more
CPU savings. A single DSA engine can reach 30 GB/s throughput on
memory comparison operation. We use kernel tcp stack for network
transfer. The best I see is around 10GB/s throughput.  RDMA can
potentially go higher but I am not sure if it can go higher than 30
GB/s throughput anytime soon.

> > +{
> > +    struct dsa_completion_thread *thread_context =
> > +        (struct dsa_completion_thread *)opaque;
> > +    struct buffer_zero_batch_task *batch_task;
> > +    struct dsa_device_group *group = thread_context->group;
> > +
> > +    rcu_register_thread();
> > +
> > +    thread_context->thread_id = qemu_get_thread_id();
> > +    qemu_sem_post(&thread_context->sem_init_done);
> > +
> > +    while (thread_context->running) {
> > +        batch_task = dsa_task_dequeue(group);
> > +        assert(batch_task != NULL || !group->running);
> > +        if (!group->running) {
> > +            assert(!thread_context->running);
> > +            break;
> > +        }
> > +        if (batch_task->task_type == DSA_TASK) {
> > +            poll_task_completion(batch_task);
> > +        } else {
> > +            assert(batch_task->task_type == DSA_BATCH_TASK);
> > +            poll_batch_task_completion(batch_task);
> > +        }
> > +
> > +        dsa_batch_task_complete(batch_task);
> > +    }
> > +
> > +    rcu_unregister_thread();
> > +    return NULL;
> > +}
> > +
> > +/**
> > + * @brief Initializes a DSA completion thread.
> > + *
> > + * @param completion_thread A pointer to the completion thread context.
> > + * @param group A pointer to the DSA device group.
> > + */
> > +static void
> > +dsa_completion_thread_init(
> > +    struct dsa_completion_thread *completion_thread,
> > +    struct dsa_device_group *group)
> > +{
> > +    completion_thread->stopping = false;
> > +    completion_thread->running = true;
> > +    completion_thread->thread_id = -1;
> > +    qemu_sem_init(&completion_thread->sem_init_done, 0);
> > +    completion_thread->group = group;
> > +
> > +    qemu_thread_create(&completion_thread->thread,
> > +                       DSA_COMPLETION_THREAD,
> > +                       dsa_completion_loop,
> > +                       completion_thread,
> > +                       QEMU_THREAD_JOINABLE);
> > +
> > +    /* Wait for initialization to complete */
> > +    while (completion_thread->thread_id == -1) {
> > +        qemu_sem_wait(&completion_thread->sem_init_done);
> > +    }
> > +}
> > +
> > +/**
> > + * @brief Stops the completion thread (and implicitly, the device group).
> > + *
> > + * @param opaque A pointer to the completion thread.
> > + */
> > +static void dsa_completion_thread_stop(void *opaque)
> > +{
> > +    struct dsa_completion_thread *thread_context =
> > +        (struct dsa_completion_thread *)opaque;
> > +
> > +    struct dsa_device_group *group = thread_context->group;
> > +
> > +    qemu_mutex_lock(&group->task_queue_lock);
> > +
> > +    thread_context->stopping = true;
> > +    thread_context->running = false;
> > +
> > +    dsa_device_group_stop(group);
> > +
> > +    qemu_cond_signal(&group->task_queue_cond);
> > +    qemu_mutex_unlock(&group->task_queue_lock);
> > +
> > +    qemu_thread_join(&thread_context->thread);
> > +
> > +    qemu_sem_destroy(&thread_context->sem_init_done);
> > +}
> > +
> >  /**
> >   * @brief Check if DSA is running.
> >   *
> > @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task *batch_task)
> >   */
> >  bool dsa_is_running(void)
> >  {
> > -    return false;
> > +    return completion_thread.running;
> >  }
> >
> >  static void
> > @@ -481,6 +720,7 @@ void dsa_start(void)
> >          return;
> >      }
> >      dsa_device_group_start(&dsa_group);
> > +    dsa_completion_thread_init(&completion_thread, &dsa_group);
> >  }
> >
> >  /**
> > @@ -496,6 +736,7 @@ void dsa_stop(void)
> >          return;
> >      }
> >
> > +    dsa_completion_thread_stop(&completion_thread);
> >      dsa_empty_task_queue(group);
> >  }
> >
Wang, Lei Dec. 19, 2023, 1:33 a.m. UTC | #4
On 12/19/2023 2:57, Hao Xiang wrote:> On Sun, Dec 17, 2023 at 7:11 PM Wang, Lei
<lei4.wang@intel.com> wrote:
>>
>> On 11/14/2023 13:40, Hao Xiang wrote:> * Create a dedicated thread for DSA task
>> completion.
>>> * DSA completion thread runs a loop and poll for completed tasks.
>>> * Start and stop DSA completion thread during DSA device start stop.
>>>
>>> User space application can directly submit task to Intel DSA
>>> accelerator by writing to DSA's device memory (mapped in user space).
>>
>>> +            }
>>> +            return;
>>> +        }
>>> +    } else {
>>> +        assert(batch_status == DSA_COMP_BATCH_FAIL ||
>>> +            batch_status == DSA_COMP_BATCH_PAGE_FAULT);
>>
>> Nit: indentation is broken here.
>>
>>> +    }
>>> +
>>> +    for (int i = 0; i < count; i++) {
>>> +
>>> +        completion = &batch_task->completions[i];
>>> +        status = completion->status;
>>> +
>>> +        if (status == DSA_COMP_SUCCESS) {
>>> +            results[i] = (completion->result == 0);
>>> +            continue;
>>> +        }
>>> +
>>> +        if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
>>> +            fprintf(stderr,
>>> +                    "Unexpected completion status = %u.\n", status);
>>> +            assert(false);
>>> +        }
>>> +    }
>>> +}
>>> +
>>> +/**
>>> + * @brief Handles an asynchronous DSA batch task completion.
>>> + *
>>> + * @param task A pointer to the batch buffer zero task structure.
>>> + */
>>> +static void
>>> +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
>>> +{
>>> +    batch_task->status = DSA_TASK_COMPLETION;
>>> +    batch_task->completion_callback(batch_task);
>>> +}
>>> +
>>> +/**
>>> + * @brief The function entry point called by a dedicated DSA
>>> + *        work item completion thread.
>>> + *
>>> + * @param opaque A pointer to the thread context.
>>> + *
>>> + * @return void* Not used.
>>> + */
>>> +static void *
>>> +dsa_completion_loop(void *opaque)
>>
>> Per my understanding, if a multifd sending thread corresponds to a DSA device,
>> then the batch tasks are executed in parallel which means a task may be
>> completed slower than another even if this task is enqueued earlier than it. If
>> we poll on the slower task first it will block the handling of the faster one,
>> even if the zero checking task for that thread is finished and it can go ahead
>> and send the data to the wire, this may lower the network resource utilization.
>>
> 
> Hi Lei, thanks for reviewing. You are correct that we can keep pulling
> a task enqueued first while others in the queue have already been
> completed. In fact, only one DSA completion thread (pulling thread) is
> used here even when multiple DSA devices are used. The pulling loop is
> the most CPU intensive activity in the DSA workflow and that acts
> directly against the goal of saving CPU usage. The trade-off I want to
> take here is a slightly higher latency on DSA task completion but more
> CPU savings. A single DSA engine can reach 30 GB/s throughput on
> memory comparison operation. We use kernel tcp stack for network
> transfer. The best I see is around 10GB/s throughput.  RDMA can
> potentially go higher but I am not sure if it can go higher than 30
> GB/s throughput anytime soon.

Hi Hao, that makes sense, if the DSA is faster than the network, then a little
bit of latency in DSA checking is tolerable. In the long term, I think the best
form of the DSA task checking thread is to use an fd or such sort of thing that
can multiplex the checking of different DSA devices, then we can serve the DSA
task in the order they complete rather than FCFS.

> 
>>> +{
>>> +    struct dsa_completion_thread *thread_context =
>>> +        (struct dsa_completion_thread *)opaque;
>>> +    struct buffer_zero_batch_task *batch_task;
>>> +    struct dsa_device_group *group = thread_context->group;
>>> +
>>> +    rcu_register_thread();
>>> +
>>> +    thread_context->thread_id = qemu_get_thread_id();
>>> +    qemu_sem_post(&thread_context->sem_init_done);
>>> +
>>> +    while (thread_context->running) {
>>> +        batch_task = dsa_task_dequeue(group);
>>> +        assert(batch_task != NULL || !group->running);
>>> +        if (!group->running) {
>>> +            assert(!thread_context->running);
>>> +            break;
>>> +        }
>>> +        if (batch_task->task_type == DSA_TASK) {
>>> +            poll_task_completion(batch_task);
>>> +        } else {
>>> +            assert(batch_task->task_type == DSA_BATCH_TASK);
>>> +            poll_batch_task_completion(batch_task);
>>> +        }
>>> +
>>> +        dsa_batch_task_complete(batch_task);
>>> +    }
>>> +
>>> +    rcu_unregister_thread();
>>> +    return NULL;
>>> +}
>>> +
>>> +/**
>>> + * @brief Initializes a DSA completion thread.
>>> + *
>>> + * @param completion_thread A pointer to the completion thread context.
>>> + * @param group A pointer to the DSA device group.
>>> + */
>>> +static void
>>> +dsa_completion_thread_init(
>>> +    struct dsa_completion_thread *completion_thread,
>>> +    struct dsa_device_group *group)
>>> +{
>>> +    completion_thread->stopping = false;
>>> +    completion_thread->running = true;
>>> +    completion_thread->thread_id = -1;
>>> +    qemu_sem_init(&completion_thread->sem_init_done, 0);
>>> +    completion_thread->group = group;
>>> +
>>> +    qemu_thread_create(&completion_thread->thread,
>>> +                       DSA_COMPLETION_THREAD,
>>> +                       dsa_completion_loop,
>>> +                       completion_thread,
>>> +                       QEMU_THREAD_JOINABLE);
>>> +
>>> +    /* Wait for initialization to complete */
>>> +    while (completion_thread->thread_id == -1) {
>>> +        qemu_sem_wait(&completion_thread->sem_init_done);
>>> +    }
>>> +}
>>> +
>>> +/**
>>> + * @brief Stops the completion thread (and implicitly, the device group).
>>> + *
>>> + * @param opaque A pointer to the completion thread.
>>> + */
>>> +static void dsa_completion_thread_stop(void *opaque)
>>> +{
>>> +    struct dsa_completion_thread *thread_context =
>>> +        (struct dsa_completion_thread *)opaque;
>>> +
>>> +    struct dsa_device_group *group = thread_context->group;
>>> +
>>> +    qemu_mutex_lock(&group->task_queue_lock);
>>> +
>>> +    thread_context->stopping = true;
>>> +    thread_context->running = false;
>>> +
>>> +    dsa_device_group_stop(group);
>>> +
>>> +    qemu_cond_signal(&group->task_queue_cond);
>>> +    qemu_mutex_unlock(&group->task_queue_lock);
>>> +
>>> +    qemu_thread_join(&thread_context->thread);
>>> +
>>> +    qemu_sem_destroy(&thread_context->sem_init_done);
>>> +}
>>> +
>>>  /**
>>>   * @brief Check if DSA is running.
>>>   *
>>> @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task *batch_task)
>>>   */
>>>  bool dsa_is_running(void)
>>>  {
>>> -    return false;
>>> +    return completion_thread.running;
>>>  }
>>>
>>>  static void
>>> @@ -481,6 +720,7 @@ void dsa_start(void)
>>>          return;
>>>      }
>>>      dsa_device_group_start(&dsa_group);
>>> +    dsa_completion_thread_init(&completion_thread, &dsa_group);
>>>  }
>>>
>>>  /**
>>> @@ -496,6 +736,7 @@ void dsa_stop(void)
>>>          return;
>>>      }
>>>
>>> +    dsa_completion_thread_stop(&completion_thread);
>>>      dsa_empty_task_queue(group);
>>>  }
>>>
Hao Xiang Dec. 19, 2023, 5:12 a.m. UTC | #5
On Mon, Dec 18, 2023 at 5:34 PM Wang, Lei <lei4.wang@intel.com> wrote:
>
> On 12/19/2023 2:57, Hao Xiang wrote:> On Sun, Dec 17, 2023 at 7:11 PM Wang, Lei
> <lei4.wang@intel.com> wrote:
> >>
> >> On 11/14/2023 13:40, Hao Xiang wrote:> * Create a dedicated thread for DSA task
> >> completion.
> >>> * DSA completion thread runs a loop and poll for completed tasks.
> >>> * Start and stop DSA completion thread during DSA device start stop.
> >>>
> >>> User space application can directly submit task to Intel DSA
> >>> accelerator by writing to DSA's device memory (mapped in user space).
> >>
> >>> +            }
> >>> +            return;
> >>> +        }
> >>> +    } else {
> >>> +        assert(batch_status == DSA_COMP_BATCH_FAIL ||
> >>> +            batch_status == DSA_COMP_BATCH_PAGE_FAULT);
> >>
> >> Nit: indentation is broken here.
> >>
> >>> +    }
> >>> +
> >>> +    for (int i = 0; i < count; i++) {
> >>> +
> >>> +        completion = &batch_task->completions[i];
> >>> +        status = completion->status;
> >>> +
> >>> +        if (status == DSA_COMP_SUCCESS) {
> >>> +            results[i] = (completion->result == 0);
> >>> +            continue;
> >>> +        }
> >>> +
> >>> +        if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
> >>> +            fprintf(stderr,
> >>> +                    "Unexpected completion status = %u.\n", status);
> >>> +            assert(false);
> >>> +        }
> >>> +    }
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief Handles an asynchronous DSA batch task completion.
> >>> + *
> >>> + * @param task A pointer to the batch buffer zero task structure.
> >>> + */
> >>> +static void
> >>> +dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
> >>> +{
> >>> +    batch_task->status = DSA_TASK_COMPLETION;
> >>> +    batch_task->completion_callback(batch_task);
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief The function entry point called by a dedicated DSA
> >>> + *        work item completion thread.
> >>> + *
> >>> + * @param opaque A pointer to the thread context.
> >>> + *
> >>> + * @return void* Not used.
> >>> + */
> >>> +static void *
> >>> +dsa_completion_loop(void *opaque)
> >>
> >> Per my understanding, if a multifd sending thread corresponds to a DSA device,
> >> then the batch tasks are executed in parallel which means a task may be
> >> completed slower than another even if this task is enqueued earlier than it. If
> >> we poll on the slower task first it will block the handling of the faster one,
> >> even if the zero checking task for that thread is finished and it can go ahead
> >> and send the data to the wire, this may lower the network resource utilization.
> >>
> >
> > Hi Lei, thanks for reviewing. You are correct that we can keep pulling
> > a task enqueued first while others in the queue have already been
> > completed. In fact, only one DSA completion thread (pulling thread) is
> > used here even when multiple DSA devices are used. The pulling loop is
> > the most CPU intensive activity in the DSA workflow and that acts
> > directly against the goal of saving CPU usage. The trade-off I want to
> > take here is a slightly higher latency on DSA task completion but more
> > CPU savings. A single DSA engine can reach 30 GB/s throughput on
> > memory comparison operation. We use kernel tcp stack for network
> > transfer. The best I see is around 10GB/s throughput.  RDMA can
> > potentially go higher but I am not sure if it can go higher than 30
> > GB/s throughput anytime soon.
>
> Hi Hao, that makes sense, if the DSA is faster than the network, then a little
> bit of latency in DSA checking is tolerable. In the long term, I think the best
> form of the DSA task checking thread is to use an fd or such sort of thing that
> can multiplex the checking of different DSA devices, then we can serve the DSA
> task in the order they complete rather than FCFS.
>
I have experimented using N completion threads and each thread pulls
tasks submitted to a particular DSA device. That approach uses too
many CPU cycles. If Intel can come up with a better workflow for DSA
completion, there is definitely space for improvement here.
> >
> >>> +{
> >>> +    struct dsa_completion_thread *thread_context =
> >>> +        (struct dsa_completion_thread *)opaque;
> >>> +    struct buffer_zero_batch_task *batch_task;
> >>> +    struct dsa_device_group *group = thread_context->group;
> >>> +
> >>> +    rcu_register_thread();
> >>> +
> >>> +    thread_context->thread_id = qemu_get_thread_id();
> >>> +    qemu_sem_post(&thread_context->sem_init_done);
> >>> +
> >>> +    while (thread_context->running) {
> >>> +        batch_task = dsa_task_dequeue(group);
> >>> +        assert(batch_task != NULL || !group->running);
> >>> +        if (!group->running) {
> >>> +            assert(!thread_context->running);
> >>> +            break;
> >>> +        }
> >>> +        if (batch_task->task_type == DSA_TASK) {
> >>> +            poll_task_completion(batch_task);
> >>> +        } else {
> >>> +            assert(batch_task->task_type == DSA_BATCH_TASK);
> >>> +            poll_batch_task_completion(batch_task);
> >>> +        }
> >>> +
> >>> +        dsa_batch_task_complete(batch_task);
> >>> +    }
> >>> +
> >>> +    rcu_unregister_thread();
> >>> +    return NULL;
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief Initializes a DSA completion thread.
> >>> + *
> >>> + * @param completion_thread A pointer to the completion thread context.
> >>> + * @param group A pointer to the DSA device group.
> >>> + */
> >>> +static void
> >>> +dsa_completion_thread_init(
> >>> +    struct dsa_completion_thread *completion_thread,
> >>> +    struct dsa_device_group *group)
> >>> +{
> >>> +    completion_thread->stopping = false;
> >>> +    completion_thread->running = true;
> >>> +    completion_thread->thread_id = -1;
> >>> +    qemu_sem_init(&completion_thread->sem_init_done, 0);
> >>> +    completion_thread->group = group;
> >>> +
> >>> +    qemu_thread_create(&completion_thread->thread,
> >>> +                       DSA_COMPLETION_THREAD,
> >>> +                       dsa_completion_loop,
> >>> +                       completion_thread,
> >>> +                       QEMU_THREAD_JOINABLE);
> >>> +
> >>> +    /* Wait for initialization to complete */
> >>> +    while (completion_thread->thread_id == -1) {
> >>> +        qemu_sem_wait(&completion_thread->sem_init_done);
> >>> +    }
> >>> +}
> >>> +
> >>> +/**
> >>> + * @brief Stops the completion thread (and implicitly, the device group).
> >>> + *
> >>> + * @param opaque A pointer to the completion thread.
> >>> + */
> >>> +static void dsa_completion_thread_stop(void *opaque)
> >>> +{
> >>> +    struct dsa_completion_thread *thread_context =
> >>> +        (struct dsa_completion_thread *)opaque;
> >>> +
> >>> +    struct dsa_device_group *group = thread_context->group;
> >>> +
> >>> +    qemu_mutex_lock(&group->task_queue_lock);
> >>> +
> >>> +    thread_context->stopping = true;
> >>> +    thread_context->running = false;
> >>> +
> >>> +    dsa_device_group_stop(group);
> >>> +
> >>> +    qemu_cond_signal(&group->task_queue_cond);
> >>> +    qemu_mutex_unlock(&group->task_queue_lock);
> >>> +
> >>> +    qemu_thread_join(&thread_context->thread);
> >>> +
> >>> +    qemu_sem_destroy(&thread_context->sem_init_done);
> >>> +}
> >>> +
> >>>  /**
> >>>   * @brief Check if DSA is running.
> >>>   *
> >>> @@ -446,7 +685,7 @@ submit_batch_wi_async(struct buffer_zero_batch_task *batch_task)
> >>>   */
> >>>  bool dsa_is_running(void)
> >>>  {
> >>> -    return false;
> >>> +    return completion_thread.running;
> >>>  }
> >>>
> >>>  static void
> >>> @@ -481,6 +720,7 @@ void dsa_start(void)
> >>>          return;
> >>>      }
> >>>      dsa_device_group_start(&dsa_group);
> >>> +    dsa_completion_thread_init(&completion_thread, &dsa_group);
> >>>  }
> >>>
> >>>  /**
> >>> @@ -496,6 +736,7 @@ void dsa_stop(void)
> >>>          return;
> >>>      }
> >>>
> >>> +    dsa_completion_thread_stop(&completion_thread);
> >>>      dsa_empty_task_queue(group);
> >>>  }
> >>>
diff mbox series

Patch

diff --git a/util/dsa.c b/util/dsa.c
index f82282ce99..0e68013ffb 100644
--- a/util/dsa.c
+++ b/util/dsa.c
@@ -44,6 +44,7 @@ 
 
 #define DSA_WQ_SIZE 4096
 #define MAX_DSA_DEVICES 16
+#define DSA_COMPLETION_THREAD "dsa_completion"
 
 typedef QSIMPLEQ_HEAD(dsa_task_queue, buffer_zero_batch_task) dsa_task_queue;
 
@@ -61,8 +62,18 @@  struct dsa_device_group {
     dsa_task_queue task_queue;
 };
 
+struct dsa_completion_thread {
+    bool stopping;
+    bool running;
+    QemuThread thread;
+    int thread_id;
+    QemuSemaphore sem_init_done;
+    struct dsa_device_group *group;
+};
+
 uint64_t max_retry_count;
 static struct dsa_device_group dsa_group;
+static struct dsa_completion_thread completion_thread;
 
 
 /**
@@ -439,6 +450,234 @@  submit_batch_wi_async(struct buffer_zero_batch_task *batch_task)
     return dsa_task_enqueue(device_group, batch_task);
 }
 
+/**
+ * @brief Poll for the DSA work item completion.
+ *
+ * @param completion A pointer to the DSA work item completion record.
+ * @param opcode The DSA opcode.
+ *
+ * @return Zero if successful, non-zero otherwise.
+ */
+static int
+poll_completion(struct dsa_completion_record *completion,
+                enum dsa_opcode opcode)
+{
+    uint8_t status;
+    uint64_t retry = 0;
+
+    while (true) {
+        // The DSA operation completes successfully or fails.
+        status = completion->status;
+        if (status == DSA_COMP_SUCCESS ||
+            status == DSA_COMP_PAGE_FAULT_NOBOF ||
+            status == DSA_COMP_BATCH_PAGE_FAULT ||
+            status == DSA_COMP_BATCH_FAIL) {
+            break;
+        } else if (status != DSA_COMP_NONE) {
+            /* TODO: Error handling here on unexpected failure. */
+            fprintf(stderr, "DSA opcode %d failed with status = %d.\n",
+                    opcode, status);
+            exit(1);
+        }
+        retry++;
+        if (retry > max_retry_count) {
+            fprintf(stderr, "Wait for completion retry %lu times.\n", retry);
+            exit(1);
+        }
+        _mm_pause();
+    }
+
+    return 0;
+}
+
+/**
+ * @brief Complete a single DSA task in the batch task.
+ *
+ * @param task A pointer to the batch task structure.
+ */
+static void
+poll_task_completion(struct buffer_zero_batch_task *task)
+{
+    assert(task->task_type == DSA_TASK);
+
+    struct dsa_completion_record *completion = &task->completions[0];
+    uint8_t status;
+
+    poll_completion(completion, task->descriptors[0].opcode);
+
+    status = completion->status;
+    if (status == DSA_COMP_SUCCESS) {
+        task->results[0] = (completion->result == 0);
+        return;
+    }
+
+    assert(status == DSA_COMP_PAGE_FAULT_NOBOF);
+}
+
+/**
+ * @brief Poll a batch task status until it completes. If DSA task doesn't
+ *        complete properly, use CPU to complete the task.
+ *
+ * @param batch_task A pointer to the DSA batch task.
+ */
+static void
+poll_batch_task_completion(struct buffer_zero_batch_task *batch_task)
+{
+    struct dsa_completion_record *batch_completion = &batch_task->batch_completion;
+    struct dsa_completion_record *completion;
+    uint8_t batch_status;
+    uint8_t status;
+    bool *results = batch_task->results;
+    uint32_t count = batch_task->batch_descriptor.desc_count;
+
+    poll_completion(batch_completion,
+                    batch_task->batch_descriptor.opcode);
+
+    batch_status = batch_completion->status;
+
+    if (batch_status == DSA_COMP_SUCCESS) {
+        if (batch_completion->bytes_completed == count) {
+            // Let's skip checking for each descriptors' completion status
+            // if the batch descriptor says all succedded.
+            for (int i = 0; i < count; i++) {
+                assert(batch_task->completions[i].status == DSA_COMP_SUCCESS);
+                results[i] = (batch_task->completions[i].result == 0);
+            }
+            return;
+        }
+    } else {
+        assert(batch_status == DSA_COMP_BATCH_FAIL ||
+            batch_status == DSA_COMP_BATCH_PAGE_FAULT);
+    }
+
+    for (int i = 0; i < count; i++) {
+
+        completion = &batch_task->completions[i];
+        status = completion->status;
+
+        if (status == DSA_COMP_SUCCESS) {
+            results[i] = (completion->result == 0);
+            continue;
+        }
+
+        if (status != DSA_COMP_PAGE_FAULT_NOBOF) {
+            fprintf(stderr,
+                    "Unexpected completion status = %u.\n", status);
+            assert(false);
+        }
+    }
+}
+
+/**
+ * @brief Handles an asynchronous DSA batch task completion.
+ *
+ * @param task A pointer to the batch buffer zero task structure.
+ */
+static void
+dsa_batch_task_complete(struct buffer_zero_batch_task *batch_task)
+{
+    batch_task->status = DSA_TASK_COMPLETION;
+    batch_task->completion_callback(batch_task);
+}
+
+/**
+ * @brief The function entry point called by a dedicated DSA
+ *        work item completion thread.
+ *
+ * @param opaque A pointer to the thread context.
+ *
+ * @return void* Not used.
+ */
+static void *
+dsa_completion_loop(void *opaque)
+{
+    struct dsa_completion_thread *thread_context =
+        (struct dsa_completion_thread *)opaque;
+    struct buffer_zero_batch_task *batch_task;
+    struct dsa_device_group *group = thread_context->group;
+
+    rcu_register_thread();
+
+    thread_context->thread_id = qemu_get_thread_id();
+    qemu_sem_post(&thread_context->sem_init_done);
+
+    while (thread_context->running) {
+        batch_task = dsa_task_dequeue(group);
+        assert(batch_task != NULL || !group->running);
+        if (!group->running) {
+            assert(!thread_context->running);
+            break;
+        }
+        if (batch_task->task_type == DSA_TASK) {
+            poll_task_completion(batch_task);
+        } else {
+            assert(batch_task->task_type == DSA_BATCH_TASK);
+            poll_batch_task_completion(batch_task);
+        }
+
+        dsa_batch_task_complete(batch_task);
+    }
+
+    rcu_unregister_thread();
+    return NULL;
+}
+
+/**
+ * @brief Initializes a DSA completion thread.
+ *
+ * @param completion_thread A pointer to the completion thread context.
+ * @param group A pointer to the DSA device group.
+ */
+static void
+dsa_completion_thread_init(
+    struct dsa_completion_thread *completion_thread,
+    struct dsa_device_group *group)
+{
+    completion_thread->stopping = false;
+    completion_thread->running = true;
+    completion_thread->thread_id = -1;
+    qemu_sem_init(&completion_thread->sem_init_done, 0);
+    completion_thread->group = group;
+
+    qemu_thread_create(&completion_thread->thread,
+                       DSA_COMPLETION_THREAD,
+                       dsa_completion_loop,
+                       completion_thread,
+                       QEMU_THREAD_JOINABLE);
+
+    /* Wait for initialization to complete */
+    while (completion_thread->thread_id == -1) {
+        qemu_sem_wait(&completion_thread->sem_init_done);
+    }
+}
+
+/**
+ * @brief Stops the completion thread (and implicitly, the device group).
+ *
+ * @param opaque A pointer to the completion thread.
+ */
+static void dsa_completion_thread_stop(void *opaque)
+{
+    struct dsa_completion_thread *thread_context =
+        (struct dsa_completion_thread *)opaque;
+
+    struct dsa_device_group *group = thread_context->group;
+
+    qemu_mutex_lock(&group->task_queue_lock);
+
+    thread_context->stopping = true;
+    thread_context->running = false;
+
+    dsa_device_group_stop(group);
+
+    qemu_cond_signal(&group->task_queue_cond);
+    qemu_mutex_unlock(&group->task_queue_lock);
+
+    qemu_thread_join(&thread_context->thread);
+
+    qemu_sem_destroy(&thread_context->sem_init_done);
+}
+
 /**
  * @brief Check if DSA is running.
  *
@@ -446,7 +685,7 @@  submit_batch_wi_async(struct buffer_zero_batch_task *batch_task)
  */
 bool dsa_is_running(void)
 {
-    return false;
+    return completion_thread.running;
 }
 
 static void
@@ -481,6 +720,7 @@  void dsa_start(void)
         return;
     }
     dsa_device_group_start(&dsa_group);
+    dsa_completion_thread_init(&completion_thread, &dsa_group);
 }
 
 /**
@@ -496,6 +736,7 @@  void dsa_stop(void)
         return;
     }
 
+    dsa_completion_thread_stop(&completion_thread);
     dsa_empty_task_queue(group);
 }