diff mbox series

[v4,5/6] block-copy: add a CoMutex

Message ID 20210614073350.17048-6-eesposit@redhat.com (mailing list archive)
State New, archived
Headers show
Series block-copy: protect block-copy internal structures | expand

Commit Message

Emanuele Giuseppe Esposito June 14, 2021, 7:33 a.m. UTC
Add a CoMutex to protect concurrent access of block-copy
data structures.

This mutex also protects .copy_bitmap, because its thread-safe
API does not prevent it from assigning two tasks to the same
bitmap region.

.finished, .cancelled and reads to .ret and .error_is_read will be
protected in the following patch, because are used also outside
coroutines.

Also set block_copy_task_create as coroutine_fn because:
1) it is static and only invoked by coroutine functions
2) this patch introduces and uses a CoMutex lock there

Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
 block/block-copy.c         | 79 +++++++++++++++++++++++++++++---------
 include/block/block-copy.h |  2 +
 2 files changed, 62 insertions(+), 19 deletions(-)

Comments

Vladimir Sementsov-Ogievskiy June 19, 2021, 7:34 p.m. UTC | #1
14.06.2021 10:33, Emanuele Giuseppe Esposito wrote:
> Add a CoMutex to protect concurrent access of block-copy
> data structures.
> 
> This mutex also protects .copy_bitmap, because its thread-safe
> API does not prevent it from assigning two tasks to the same
> bitmap region.
> 
> .finished, .cancelled and reads to .ret and .error_is_read will be
> protected in the following patch, because are used also outside
> coroutines.
> 
> Also set block_copy_task_create as coroutine_fn because:
> 1) it is static and only invoked by coroutine functions
> 2) this patch introduces and uses a CoMutex lock there
> 
> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
> ---
>   block/block-copy.c         | 79 +++++++++++++++++++++++++++++---------
>   include/block/block-copy.h |  2 +
>   2 files changed, 62 insertions(+), 19 deletions(-)
> 
> diff --git a/block/block-copy.c b/block/block-copy.c
> index afa2f484f0..6416929abd 100644
> --- a/block/block-copy.c
> +++ b/block/block-copy.c
> @@ -61,6 +61,7 @@ typedef struct BlockCopyCallState {
>   
>       /* OUT parameters */
>       bool cancelled;
> +    /* Fields protected by lock in BlockCopyState */
>       bool error_is_read;
>       int ret;
>   } BlockCopyCallState;
> @@ -77,8 +78,12 @@ typedef struct BlockCopyTask {
>       int64_t offset;
>       BlockCopyMethod method;
>   
> -    /* State */
> +    /* State. Protected by lock in BlockCopyState */
>       CoQueue wait_queue; /* coroutines blocked on this task */
> +    /*
> +     * Only protect the case of parallel read while updating @bytes
> +     * value in block_copy_task_shrink().
> +     */

So, you have to add new comments and modify comment added in previous commit. That's another sign to just merge patch 03 here.

>       int64_t bytes;
>       QLIST_ENTRY(BlockCopyTask) list;
>   } BlockCopyTask;
> @@ -97,7 +102,8 @@ typedef struct BlockCopyState {
>       BdrvChild *source;
>       BdrvChild *target;
>   
> -    /* State */
> +    /* State. Protected by lock */
> +    CoMutex lock;
>       int64_t in_flight_bytes;
>       BlockCopyMethod method;
>       QLIST_HEAD(, BlockCopyTask) tasks; /* All tasks from all block-copy calls */
> @@ -137,6 +143,7 @@ typedef struct BlockCopyState {
>       bool skip_unallocated;
>   } BlockCopyState;
>   
> +/* Called with lock held */
>   static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
>                                               int64_t offset, int64_t bytes)
>   {
> @@ -154,6 +161,8 @@ static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
>   /*
>    * If there are no intersecting tasks return false. Otherwise, wait for the
>    * first found intersecting tasks to finish and return true.
> + *
> + * Called with lock held.

Please:

Called with lock held, may temporary release the lock. Return value of 0 proves that lock was NOT released.

>    */
>   static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
>                                                int64_t bytes)
> @@ -164,7 +173,7 @@ static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
>           return false;
>       }
>   
> -    qemu_co_queue_wait(&task->wait_queue, NULL);
> +    qemu_co_queue_wait(&task->wait_queue, &s->lock);
>   
>       return true;
>   }
> @@ -191,14 +200,15 @@ static int64_t block_copy_chunk_size(BlockCopyState *s)

Hmm, you had /* Called with lock held */ comment for block_copy_chunk_size() in a previous version.. Why dropped?

>    * Search for the first dirty area in offset/bytes range and create task at
>    * the beginning of it.
>    */
> -static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
> -                                             BlockCopyCallState *call_state,
> -                                             int64_t offset, int64_t bytes)
> +static coroutine_fn BlockCopyTask *block_copy_task_create(BlockCopyState *s,
> +                                                BlockCopyCallState *call_state,
> +                                                int64_t offset, int64_t bytes)

That breaks Qemu coding style (docs/devel/style.rst  "Multiline Indent")

Function "type" becomes too long. Common practice is keep a type on a separate line:

static coroutine_fn BlockCopyTask *
block_copy_task_create(BlockCopyState *s, BlockCopyCallState *call_state,
                        int64_t offset, int64_t bytes)

(and this is used in the file in one place, so it would be consistent do same thing here)

Another way is 4-spaces indent:

static coroutine_fn BlockCopyTask *block_copy_task_create(BlockCopyState *s,
     BlockCopyCallState *call_state, int64_t offset, int64_t bytes)


choose what you want.

>   {
>       BlockCopyTask *task;
> -    int64_t max_chunk = block_copy_chunk_size(s);
> +    int64_t max_chunk;
>   
> -    max_chunk = MIN_NON_ZERO(max_chunk, call_state->max_chunk);
> +    QEMU_LOCK_GUARD(&s->lock);
> +    max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk);
>       if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
>                                              offset, offset + bytes,
>                                              max_chunk, &offset, &bytes))
> @@ -240,6 +250,7 @@ static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
>   static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
>                                                   int64_t new_bytes)
>   {
> +    QEMU_LOCK_GUARD(&task->s->lock);
>       if (new_bytes == task->bytes) {
>           return;
>       }
> @@ -256,6 +267,7 @@ static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
>   
>   static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
>   {
> +    QEMU_LOCK_GUARD(&task->s->lock);
>       task->s->in_flight_bytes -= task->bytes;
>       if (ret < 0) {
>           bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->offset, task->bytes);
> @@ -334,12 +346,14 @@ BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
>       }
>   
>       ratelimit_init(&s->rate_limit);
> +    qemu_co_mutex_init(&s->lock);
>       QLIST_INIT(&s->tasks);
>       QLIST_INIT(&s->calls);
>   
>       return s;
>   }
>   
> +/* Only set before running the job, so it is thread safe to call. */

It's not thread-safe to call, it's just not used so that we don't have to care abot thread-safety.. So, maybe:

Only set before running the job, no need for locking.

>   void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
>   {
>       s->progress = pm;
> @@ -480,16 +494,20 @@ static coroutine_fn int block_copy_task_entry(AioTask *task)
>       int ret;
>   
>       ret = block_copy_do_copy(s, t->offset, t->bytes, &method, &error_is_read);
> -    if (s->method == t->method) {
> -        s->method = method;
> -    }
> -    if (ret < 0) {
> -        if (!t->call_state->ret) {
> -            t->call_state->ret = ret;
> -            t->call_state->error_is_read = error_is_read;
> +
> +    WITH_QEMU_LOCK_GUARD(&s->lock) {
> +        if (s->method == t->method) {
> +            s->method = method;
> +        }
> +
> +        if (ret < 0) {
> +            if (!t->call_state->ret) {
> +                t->call_state->ret = ret;
> +                t->call_state->error_is_read = error_is_read;
> +            }
> +        } else {
> +            progress_work_done(s->progress, t->bytes);
>           }
> -    } else {
> -        progress_work_done(s->progress, t->bytes);
>       }
>       co_put_to_shres(s->mem, t->bytes);
>       block_copy_task_end(t, ret);
> @@ -591,10 +609,12 @@ int64_t block_copy_reset_unallocated(BlockCopyState *s,
>       bytes = clusters * s->cluster_size;
>   
>       if (!ret) {
> +        qemu_co_mutex_lock(&s->lock);
>           bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
>           progress_set_remaining(s->progress,
>                                  bdrv_get_dirty_count(s->copy_bitmap) +
>                                  s->in_flight_bytes);
> +        qemu_co_mutex_unlock(&s->lock);
>       }
>   
>       *count = bytes;
> @@ -734,14 +754,33 @@ static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
>       int ret;
>       BlockCopyState *s = call_state->s;
>   
> +    qemu_co_mutex_lock(&s->lock);
>       QLIST_INSERT_HEAD(&s->calls, call_state, list);
> +    qemu_co_mutex_unlock(&s->lock);
>   
>       do {
>           ret = block_copy_dirty_clusters(call_state);
>   
>           if (ret == 0 && !call_state->cancelled) {
> -            ret = block_copy_wait_one(s, call_state->offset,
> -                                      call_state->bytes);
> +            WITH_QEMU_LOCK_GUARD(&s->lock) {
> +                /*
> +                 * Check that there is no task we still need to
> +                 * wait to complete
> +                 */
> +                ret = block_copy_wait_one(s, call_state->offset,
> +                                          call_state->bytes);
> +                if (ret == 0) {
> +                    /*

Please add: block_copy_wait_one return value 0 also means that it didn't relase the lock. So, we are still in the same critical section, not interrupted by any concurrent access to state.

(actually, I started to write a comment that critical section is broken here, but understood that all is OK)

> +                     * No pending tasks, but check again the bitmap in this
> +                     * same critical section, since a task might have failed
> +                     * between this and the critical section in
> +                     * block_copy_dirty_clusters().
> +                     */
> +                    ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap,
> +                                                       call_state->offset,
> +                                                       call_state->bytes) > 0;

It should be s/> 0/>= 0/. bdrv_dirty_bitmap_next_dirty() returns -1 if not found. And 0 is valid offset.

> +                }
> +            }
>           }
k>
>           /*
> @@ -761,7 +800,9 @@ static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
>           call_state->cb(call_state->cb_opaque);
>       }
>   
> +    qemu_co_mutex_lock(&s->lock);
>       QLIST_REMOVE(call_state, list);
> +    qemu_co_mutex_unlock(&s->lock);
>   
>       return ret;
>   }
> diff --git a/include/block/block-copy.h b/include/block/block-copy.h
> index 338f2ea7fd..bf8f0c679f 100644
> --- a/include/block/block-copy.h
> +++ b/include/block/block-copy.h

Please, add

[diff]
     orderFile = /path/to/qemu/scripts/git.orderfile

to your .git/config file, for good ordering of files inside a patch. For example, headers would be at the top of the patch.

> @@ -18,6 +18,8 @@
>   #include "block/block.h"
>   #include "qemu/co-shared-resource.h"
>   
> +/* All APIs are thread-safe*/

To be honest, you can add this comment only in the following commit.
Also, whitespace missed.

> +
>   typedef void (*BlockCopyAsyncCallbackFunc)(void *opaque);
>   typedef struct BlockCopyState BlockCopyState;
>   typedef struct BlockCopyCallState BlockCopyCallState;
> 

Overall, looks close :)
diff mbox series

Patch

diff --git a/block/block-copy.c b/block/block-copy.c
index afa2f484f0..6416929abd 100644
--- a/block/block-copy.c
+++ b/block/block-copy.c
@@ -61,6 +61,7 @@  typedef struct BlockCopyCallState {
 
     /* OUT parameters */
     bool cancelled;
+    /* Fields protected by lock in BlockCopyState */
     bool error_is_read;
     int ret;
 } BlockCopyCallState;
@@ -77,8 +78,12 @@  typedef struct BlockCopyTask {
     int64_t offset;
     BlockCopyMethod method;
 
-    /* State */
+    /* State. Protected by lock in BlockCopyState */
     CoQueue wait_queue; /* coroutines blocked on this task */
+    /*
+     * Only protect the case of parallel read while updating @bytes
+     * value in block_copy_task_shrink().
+     */
     int64_t bytes;
     QLIST_ENTRY(BlockCopyTask) list;
 } BlockCopyTask;
@@ -97,7 +102,8 @@  typedef struct BlockCopyState {
     BdrvChild *source;
     BdrvChild *target;
 
-    /* State */
+    /* State. Protected by lock */
+    CoMutex lock;
     int64_t in_flight_bytes;
     BlockCopyMethod method;
     QLIST_HEAD(, BlockCopyTask) tasks; /* All tasks from all block-copy calls */
@@ -137,6 +143,7 @@  typedef struct BlockCopyState {
     bool skip_unallocated;
 } BlockCopyState;
 
+/* Called with lock held */
 static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
                                             int64_t offset, int64_t bytes)
 {
@@ -154,6 +161,8 @@  static BlockCopyTask *find_conflicting_task(BlockCopyState *s,
 /*
  * If there are no intersecting tasks return false. Otherwise, wait for the
  * first found intersecting tasks to finish and return true.
+ *
+ * Called with lock held.
  */
 static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
                                              int64_t bytes)
@@ -164,7 +173,7 @@  static bool coroutine_fn block_copy_wait_one(BlockCopyState *s, int64_t offset,
         return false;
     }
 
-    qemu_co_queue_wait(&task->wait_queue, NULL);
+    qemu_co_queue_wait(&task->wait_queue, &s->lock);
 
     return true;
 }
@@ -191,14 +200,15 @@  static int64_t block_copy_chunk_size(BlockCopyState *s)
  * Search for the first dirty area in offset/bytes range and create task at
  * the beginning of it.
  */
-static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
-                                             BlockCopyCallState *call_state,
-                                             int64_t offset, int64_t bytes)
+static coroutine_fn BlockCopyTask *block_copy_task_create(BlockCopyState *s,
+                                                BlockCopyCallState *call_state,
+                                                int64_t offset, int64_t bytes)
 {
     BlockCopyTask *task;
-    int64_t max_chunk = block_copy_chunk_size(s);
+    int64_t max_chunk;
 
-    max_chunk = MIN_NON_ZERO(max_chunk, call_state->max_chunk);
+    QEMU_LOCK_GUARD(&s->lock);
+    max_chunk = MIN_NON_ZERO(block_copy_chunk_size(s), call_state->max_chunk);
     if (!bdrv_dirty_bitmap_next_dirty_area(s->copy_bitmap,
                                            offset, offset + bytes,
                                            max_chunk, &offset, &bytes))
@@ -240,6 +250,7 @@  static BlockCopyTask *block_copy_task_create(BlockCopyState *s,
 static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
                                                 int64_t new_bytes)
 {
+    QEMU_LOCK_GUARD(&task->s->lock);
     if (new_bytes == task->bytes) {
         return;
     }
@@ -256,6 +267,7 @@  static void coroutine_fn block_copy_task_shrink(BlockCopyTask *task,
 
 static void coroutine_fn block_copy_task_end(BlockCopyTask *task, int ret)
 {
+    QEMU_LOCK_GUARD(&task->s->lock);
     task->s->in_flight_bytes -= task->bytes;
     if (ret < 0) {
         bdrv_set_dirty_bitmap(task->s->copy_bitmap, task->offset, task->bytes);
@@ -334,12 +346,14 @@  BlockCopyState *block_copy_state_new(BdrvChild *source, BdrvChild *target,
     }
 
     ratelimit_init(&s->rate_limit);
+    qemu_co_mutex_init(&s->lock);
     QLIST_INIT(&s->tasks);
     QLIST_INIT(&s->calls);
 
     return s;
 }
 
+/* Only set before running the job, so it is thread safe to call. */
 void block_copy_set_progress_meter(BlockCopyState *s, ProgressMeter *pm)
 {
     s->progress = pm;
@@ -480,16 +494,20 @@  static coroutine_fn int block_copy_task_entry(AioTask *task)
     int ret;
 
     ret = block_copy_do_copy(s, t->offset, t->bytes, &method, &error_is_read);
-    if (s->method == t->method) {
-        s->method = method;
-    }
-    if (ret < 0) {
-        if (!t->call_state->ret) {
-            t->call_state->ret = ret;
-            t->call_state->error_is_read = error_is_read;
+
+    WITH_QEMU_LOCK_GUARD(&s->lock) {
+        if (s->method == t->method) {
+            s->method = method;
+        }
+
+        if (ret < 0) {
+            if (!t->call_state->ret) {
+                t->call_state->ret = ret;
+                t->call_state->error_is_read = error_is_read;
+            }
+        } else {
+            progress_work_done(s->progress, t->bytes);
         }
-    } else {
-        progress_work_done(s->progress, t->bytes);
     }
     co_put_to_shres(s->mem, t->bytes);
     block_copy_task_end(t, ret);
@@ -591,10 +609,12 @@  int64_t block_copy_reset_unallocated(BlockCopyState *s,
     bytes = clusters * s->cluster_size;
 
     if (!ret) {
+        qemu_co_mutex_lock(&s->lock);
         bdrv_reset_dirty_bitmap(s->copy_bitmap, offset, bytes);
         progress_set_remaining(s->progress,
                                bdrv_get_dirty_count(s->copy_bitmap) +
                                s->in_flight_bytes);
+        qemu_co_mutex_unlock(&s->lock);
     }
 
     *count = bytes;
@@ -734,14 +754,33 @@  static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
     int ret;
     BlockCopyState *s = call_state->s;
 
+    qemu_co_mutex_lock(&s->lock);
     QLIST_INSERT_HEAD(&s->calls, call_state, list);
+    qemu_co_mutex_unlock(&s->lock);
 
     do {
         ret = block_copy_dirty_clusters(call_state);
 
         if (ret == 0 && !call_state->cancelled) {
-            ret = block_copy_wait_one(s, call_state->offset,
-                                      call_state->bytes);
+            WITH_QEMU_LOCK_GUARD(&s->lock) {
+                /*
+                 * Check that there is no task we still need to
+                 * wait to complete
+                 */
+                ret = block_copy_wait_one(s, call_state->offset,
+                                          call_state->bytes);
+                if (ret == 0) {
+                    /*
+                     * No pending tasks, but check again the bitmap in this
+                     * same critical section, since a task might have failed
+                     * between this and the critical section in
+                     * block_copy_dirty_clusters().
+                     */
+                    ret = bdrv_dirty_bitmap_next_dirty(s->copy_bitmap,
+                                                       call_state->offset,
+                                                       call_state->bytes) > 0;
+                }
+            }
         }
 
         /*
@@ -761,7 +800,9 @@  static int coroutine_fn block_copy_common(BlockCopyCallState *call_state)
         call_state->cb(call_state->cb_opaque);
     }
 
+    qemu_co_mutex_lock(&s->lock);
     QLIST_REMOVE(call_state, list);
+    qemu_co_mutex_unlock(&s->lock);
 
     return ret;
 }
diff --git a/include/block/block-copy.h b/include/block/block-copy.h
index 338f2ea7fd..bf8f0c679f 100644
--- a/include/block/block-copy.h
+++ b/include/block/block-copy.h
@@ -18,6 +18,8 @@ 
 #include "block/block.h"
 #include "qemu/co-shared-resource.h"
 
+/* All APIs are thread-safe*/
+
 typedef void (*BlockCopyAsyncCallbackFunc)(void *opaque);
 typedef struct BlockCopyState BlockCopyState;
 typedef struct BlockCopyCallState BlockCopyCallState;