diff mbox

[08/16] blockjob: introduce .drain callback for jobs

Message ID 1455645388-32401-9-git-send-email-pbonzini@redhat.com (mailing list archive)
State New, archived
Headers show

Commit Message

Paolo Bonzini Feb. 16, 2016, 5:56 p.m. UTC
This is required to decouple block jobs from running in an
AioContext.  With multiqueue block devices, a BlockDriverState
does not really belong to a single AioContext.

The solution is to first wait until all I/O operations are
complete; then loop in the main thread for the block job to
complete entirely.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/backup.c           |  7 +++++++
 block/mirror.c           | 12 ++++++++++--
 blockjob.c               | 16 +++++++++++++---
 include/block/blockjob.h |  7 +++++++
 4 files changed, 37 insertions(+), 5 deletions(-)

Comments

Stefan Hajnoczi March 16, 2016, 5:56 p.m. UTC | #1
On Tue, Feb 16, 2016 at 06:56:20PM +0100, Paolo Bonzini wrote:
> This is required to decouple block jobs from running in an
> AioContext.  With multiqueue block devices, a BlockDriverState
> does not really belong to a single AioContext.
> 
> The solution is to first wait until all I/O operations are
> complete; then loop in the main thread for the block job to
> complete entirely.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/backup.c           |  7 +++++++
>  block/mirror.c           | 12 ++++++++++--
>  blockjob.c               | 16 +++++++++++++---
>  include/block/blockjob.h |  7 +++++++
>  4 files changed, 37 insertions(+), 5 deletions(-)
> 
> diff --git a/block/backup.c b/block/backup.c
> index 00cafdb..2edb895 100644
> --- a/block/backup.c
> +++ b/block/backup.c
> @@ -251,6 +251,12 @@ static void backup_abort(BlockJob *job)
>      }
>  }
>  
> +static void backup_drain(BlockJob *job)
> +{
> +    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
> +    bdrv_drain(s->target);
> +}
> +
>  static const BlockJobDriver backup_job_driver = {
>      .instance_size  = sizeof(BackupBlockJob),
>      .job_type       = BLOCK_JOB_TYPE_BACKUP,
> @@ -258,6 +264,7 @@ static const BlockJobDriver backup_job_driver = {
>      .iostatus_reset = backup_iostatus_reset,
>      .commit         = backup_commit,
>      .abort          = backup_abort,
> +    .drain          = backup_drain,
>  };
>  
>  static BlockErrorAction backup_error_action(BackupBlockJob *job,
> diff --git a/block/mirror.c b/block/mirror.c
> index 3f163b8..b473a1b 100644
> --- a/block/mirror.c
> +++ b/block/mirror.c
> @@ -358,7 +358,7 @@ static void mirror_free_init(MirrorBlockJob *s)
>      }
>  }
>  
> -static void mirror_drain(MirrorBlockJob *s)
> +static void mirror_wait_for_completion(MirrorBlockJob *s)
>  {
>      while (s->in_flight > 0) {
>          s->waiting_for_io = true;
> @@ -627,7 +627,7 @@ immediate_exit:
>           * the target is a copy of the source.
>           */
>          assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
> -        mirror_drain(s);
> +        mirror_wait_for_completion(s);
>      }
>  
>      assert(s->in_flight == 0);
> @@ -708,12 +708,19 @@ static void mirror_complete(BlockJob *job, Error **errp)
>      block_job_enter(&s->common);
>  }
>  
> +static void mirror_drain(BlockJob *job)
> +{
> +    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
> +    bdrv_drain(s->target);
> +}
> +
>  static const BlockJobDriver mirror_job_driver = {
>      .instance_size = sizeof(MirrorBlockJob),
>      .job_type      = BLOCK_JOB_TYPE_MIRROR,
>      .set_speed     = mirror_set_speed,
>      .iostatus_reset= mirror_iostatus_reset,
>      .complete      = mirror_complete,
> +    .drain         = mirror_drain,
>  };
>  
>  static const BlockJobDriver commit_active_job_driver = {
> @@ -723,6 +730,7 @@ static const BlockJobDriver commit_active_job_driver = {
>      .iostatus_reset
>                     = mirror_iostatus_reset,
>      .complete      = mirror_complete,
> +    .drain         = mirror_drain,
>  };
>  
>  static void mirror_start_job(BlockDriverState *bs, BlockDriverState *target,
> diff --git a/blockjob.c b/blockjob.c
> index 9fc37ca..5bb6f9b 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -289,16 +289,26 @@ static int block_job_finish_sync(BlockJob *job,
>      assert(bs->job == job);
>  
>      block_job_ref(job);
> +
> +    /* finish will call block_job_enter (see e.g. block_job_cancel,
> +     * or mirror_complete in block/mirror.c).  Barring bugs in the
> +     * job coroutine, bdrv_drain should be enough to induce progress
> +     * until the job completes or moves to the main thread.
> +    */
>      finish(job, &local_err);
>      if (local_err) {
>          error_propagate(errp, local_err);
>          block_job_unref(job);
>          return -EBUSY;
>      }
> +    while (!job->deferred_to_main_loop && !job->completed) {
> +        bdrv_drain(bs);
> +        if (job->driver->drain) {
> +            job->driver->drain(job);
> +        }
> +    }
>      while (!job->completed) {
> -        aio_poll(job->deferred_to_main_loop ? qemu_get_aio_context() :
> -                                              bdrv_get_aio_context(bs),
> -                 true);
> +        aio_poll(qemu_get_aio_context(), true);
>      }

This adds the assumption that block_job_defer_to_main_loop() is only
used to complete the block job.  If we really need to make this
assumption then we could rename it complete_in_main_loop() or similar so
that is clear.

>      ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
>      block_job_unref(job);
> diff --git a/include/block/blockjob.h b/include/block/blockjob.h
> index 8bedc49..8e564df 100644
> --- a/include/block/blockjob.h
> +++ b/include/block/blockjob.h
> @@ -70,6 +70,13 @@ typedef struct BlockJobDriver {
>       * never both.
>       */
>      void (*abort)(BlockJob *job);
> +
> +    /**
> +     * If the callback is not NULL, it will be invoked when the job has to be
> +     * synchronously cancelled or completed; it should drain BlockDriverStates
> +     * as required to ensure progress.
> +     */
> +    void (*drain)(BlockJob *job);
>  } BlockJobDriver;
>  
>  /**
> -- 
> 2.5.0
> 
>
diff mbox

Patch

diff --git a/block/backup.c b/block/backup.c
index 00cafdb..2edb895 100644
--- a/block/backup.c
+++ b/block/backup.c
@@ -251,6 +251,12 @@  static void backup_abort(BlockJob *job)
     }
 }
 
+static void backup_drain(BlockJob *job)
+{
+    BackupBlockJob *s = container_of(job, BackupBlockJob, common);
+    bdrv_drain(s->target);
+}
+
 static const BlockJobDriver backup_job_driver = {
     .instance_size  = sizeof(BackupBlockJob),
     .job_type       = BLOCK_JOB_TYPE_BACKUP,
@@ -258,6 +264,7 @@  static const BlockJobDriver backup_job_driver = {
     .iostatus_reset = backup_iostatus_reset,
     .commit         = backup_commit,
     .abort          = backup_abort,
+    .drain          = backup_drain,
 };
 
 static BlockErrorAction backup_error_action(BackupBlockJob *job,
diff --git a/block/mirror.c b/block/mirror.c
index 3f163b8..b473a1b 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -358,7 +358,7 @@  static void mirror_free_init(MirrorBlockJob *s)
     }
 }
 
-static void mirror_drain(MirrorBlockJob *s)
+static void mirror_wait_for_completion(MirrorBlockJob *s)
 {
     while (s->in_flight > 0) {
         s->waiting_for_io = true;
@@ -627,7 +627,7 @@  immediate_exit:
          * the target is a copy of the source.
          */
         assert(ret < 0 || (!s->synced && block_job_is_cancelled(&s->common)));
-        mirror_drain(s);
+        mirror_wait_for_completion(s);
     }
 
     assert(s->in_flight == 0);
@@ -708,12 +708,19 @@  static void mirror_complete(BlockJob *job, Error **errp)
     block_job_enter(&s->common);
 }
 
+static void mirror_drain(BlockJob *job)
+{
+    MirrorBlockJob *s = container_of(job, MirrorBlockJob, common);
+    bdrv_drain(s->target);
+}
+
 static const BlockJobDriver mirror_job_driver = {
     .instance_size = sizeof(MirrorBlockJob),
     .job_type      = BLOCK_JOB_TYPE_MIRROR,
     .set_speed     = mirror_set_speed,
     .iostatus_reset= mirror_iostatus_reset,
     .complete      = mirror_complete,
+    .drain         = mirror_drain,
 };
 
 static const BlockJobDriver commit_active_job_driver = {
@@ -723,6 +730,7 @@  static const BlockJobDriver commit_active_job_driver = {
     .iostatus_reset
                    = mirror_iostatus_reset,
     .complete      = mirror_complete,
+    .drain         = mirror_drain,
 };
 
 static void mirror_start_job(BlockDriverState *bs, BlockDriverState *target,
diff --git a/blockjob.c b/blockjob.c
index 9fc37ca..5bb6f9b 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -289,16 +289,26 @@  static int block_job_finish_sync(BlockJob *job,
     assert(bs->job == job);
 
     block_job_ref(job);
+
+    /* finish will call block_job_enter (see e.g. block_job_cancel,
+     * or mirror_complete in block/mirror.c).  Barring bugs in the
+     * job coroutine, bdrv_drain should be enough to induce progress
+     * until the job completes or moves to the main thread.
+    */
     finish(job, &local_err);
     if (local_err) {
         error_propagate(errp, local_err);
         block_job_unref(job);
         return -EBUSY;
     }
+    while (!job->deferred_to_main_loop && !job->completed) {
+        bdrv_drain(bs);
+        if (job->driver->drain) {
+            job->driver->drain(job);
+        }
+    }
     while (!job->completed) {
-        aio_poll(job->deferred_to_main_loop ? qemu_get_aio_context() :
-                                              bdrv_get_aio_context(bs),
-                 true);
+        aio_poll(qemu_get_aio_context(), true);
     }
     ret = (job->cancelled && job->ret == 0) ? -ECANCELED : job->ret;
     block_job_unref(job);
diff --git a/include/block/blockjob.h b/include/block/blockjob.h
index 8bedc49..8e564df 100644
--- a/include/block/blockjob.h
+++ b/include/block/blockjob.h
@@ -70,6 +70,13 @@  typedef struct BlockJobDriver {
      * never both.
      */
     void (*abort)(BlockJob *job);
+
+    /**
+     * If the callback is not NULL, it will be invoked when the job has to be
+     * synchronously cancelled or completed; it should drain BlockDriverStates
+     * as required to ensure progress.
+     */
+    void (*drain)(BlockJob *job);
 } BlockJobDriver;
 
 /**