diff mbox series

[v2,4/8] qcow2-threads: split out generic path

Message ID 20181211164317.32893-5-vsementsov@virtuozzo.com (mailing list archive)
State New, archived
Headers show
Series qcow2: encryption threads | expand

Commit Message

Vladimir Sementsov-Ogievskiy Dec. 11, 2018, 4:43 p.m. UTC
Move generic part out of qcow2_co_do_compress, to reuse it for
encryption and rename things that would be shared with encryption path.

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
---
 block/qcow2.h         |  4 ++--
 block/qcow2-threads.c | 39 +++++++++++++++++++++++++++------------
 block/qcow2.c         |  2 +-
 3 files changed, 30 insertions(+), 15 deletions(-)

Comments

Paolo Bonzini Dec. 13, 2018, 11:28 p.m. UTC | #1
On 11/12/18 17:43, Vladimir Sementsov-Ogievskiy wrote:
> +    ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
> +
> +    while (s->nb_threads >= QCOW2_MAX_THREADS) {
> +        qemu_co_queue_wait(&s->thread_task_queue, NULL);
> +    }
> +
> +    s->nb_threads++;
> +    ret = thread_pool_submit_co(pool, func, arg);
> +    s->nb_threads--;
> +
> +    qemu_co_queue_next(&s->thread_task_queue);

What lock is protecting this manipulation?  I'd rather avoid adding more
users of the AioContext lock, especially manipulation of CoQueues.

One possibility is to do the s->lock unlock/lock here, and then also
rely on that in patch 8.

Paolo
Vladimir Sementsov-Ogievskiy Dec. 14, 2018, 8:51 a.m. UTC | #2
14.12.2018 2:28, Paolo Bonzini wrote:
> On 11/12/18 17:43, Vladimir Sementsov-Ogievskiy wrote:
>> +    ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
>> +
>> +    while (s->nb_threads >= QCOW2_MAX_THREADS) {
>> +        qemu_co_queue_wait(&s->thread_task_queue, NULL);
>> +    }
>> +
>> +    s->nb_threads++;
>> +    ret = thread_pool_submit_co(pool, func, arg);
>> +    s->nb_threads--;
>> +
>> +    qemu_co_queue_next(&s->thread_task_queue);
> 
> What lock is protecting this manipulation?  I'd rather avoid adding more
> users of the AioContext lock, especially manipulation of CoQueues.
> 
> One possibility is to do the s->lock unlock/lock here, and then also
> rely on that in patch 8.
> 
> Paolo
> 

Hm, can you please give more details? It's refactoring commit, so, do you mean
that there is an existing bug? We are in coroutine, so, it's not safe to call
qemu_co_queue_next?

I feel, I don't understand the future without AioContext lock, and how it influence
contexts/threads/coroutines.. So, from this context, s->thread_task_queue may be
accessed from other thread, when we are here, and calling qemu_co_queue_next from
our aio context? But from where? /Sorry for may be stupid questions/
Alberto Garcia Jan. 4, 2019, 2:59 p.m. UTC | #3
On Fri 14 Dec 2018 09:51:12 AM CET, Vladimir Sementsov-Ogievskiy wrote:
> 14.12.2018 2:28, Paolo Bonzini wrote:
>> On 11/12/18 17:43, Vladimir Sementsov-Ogievskiy wrote:
>>> +    ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
>>> +
>>> +    while (s->nb_threads >= QCOW2_MAX_THREADS) {
>>> +        qemu_co_queue_wait(&s->thread_task_queue, NULL);
>>> +    }
>>> +
>>> +    s->nb_threads++;
>>> +    ret = thread_pool_submit_co(pool, func, arg);
>>> +    s->nb_threads--;
>>> +
>>> +    qemu_co_queue_next(&s->thread_task_queue);
>> 
>> What lock is protecting this manipulation?  I'd rather avoid adding more
>> users of the AioContext lock, especially manipulation of CoQueues.
>> 
>> One possibility is to do the s->lock unlock/lock here, and then also
>> rely on that in patch 8.
>
> Hm, can you please give more details? It's refactoring commit, so, do
> you mean that there is an existing bug?

I think there's an existing problem but not a currently reproducible bug
as such: accesses to the qcow2 BlockDriverState are protected by the
same AioContext lock, but as soon as you want to implement multiqueue
you'll have multiple iothreads (and contexts) accessing the same BDS.

For that to work you need that the in-memory qcow2 metadata is properly
locked, and that's what s->lock is for. However this CoQueue here is not
protected by any lock (s->lock is taken _after_ the data is compressed
in qcow2_co_pwritev_compressed()).

And I think that's it in a nutshell, Paolo correct me if I'm wrong.

Berto
diff mbox series

Patch

diff --git a/block/qcow2.h b/block/qcow2.h
index be84d7c96a..9c2f6749ba 100644
--- a/block/qcow2.h
+++ b/block/qcow2.h
@@ -336,8 +336,8 @@  typedef struct BDRVQcow2State {
     char *image_backing_file;
     char *image_backing_format;
 
-    CoQueue compress_wait_queue;
-    int nb_compress_threads;
+    CoQueue thread_task_queue;
+    int nb_threads;
 } BDRVQcow2State;
 
 typedef struct Qcow2COWRegion {
diff --git a/block/qcow2-threads.c b/block/qcow2-threads.c
index 8c10191f2f..84b3ede4f1 100644
--- a/block/qcow2-threads.c
+++ b/block/qcow2-threads.c
@@ -31,7 +31,32 @@ 
 #include "qcow2.h"
 #include "block/thread-pool.h"
 
-#define MAX_COMPRESS_THREADS 4
+#define QCOW2_MAX_THREADS 4
+
+static int coroutine_fn
+qcow2_co_process(BlockDriverState *bs, ThreadPoolFunc *func, void *arg)
+{
+    int ret;
+    BDRVQcow2State *s = bs->opaque;
+    ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
+
+    while (s->nb_threads >= QCOW2_MAX_THREADS) {
+        qemu_co_queue_wait(&s->thread_task_queue, NULL);
+    }
+
+    s->nb_threads++;
+    ret = thread_pool_submit_co(pool, func, arg);
+    s->nb_threads--;
+
+    qemu_co_queue_next(&s->thread_task_queue);
+
+    return ret;
+}
+
+
+/*
+ * Compression
+ */
 
 typedef ssize_t (*Qcow2CompressFunc)(void *dest, size_t dest_size,
                                      const void *src, size_t src_size);
@@ -144,8 +169,6 @@  static ssize_t coroutine_fn
 qcow2_co_do_compress(BlockDriverState *bs, void *dest, size_t dest_size,
                      const void *src, size_t src_size, Qcow2CompressFunc func)
 {
-    BDRVQcow2State *s = bs->opaque;
-    ThreadPool *pool = aio_get_thread_pool(bdrv_get_aio_context(bs));
     Qcow2CompressData arg = {
         .dest = dest,
         .dest_size = dest_size,
@@ -154,15 +177,7 @@  qcow2_co_do_compress(BlockDriverState *bs, void *dest, size_t dest_size,
         .func = func,
     };
 
-    while (s->nb_compress_threads >= MAX_COMPRESS_THREADS) {
-        qemu_co_queue_wait(&s->compress_wait_queue, NULL);
-    }
-
-    s->nb_compress_threads++;
-    thread_pool_submit_co(pool, qcow2_compress_pool_func, &arg);
-    s->nb_compress_threads--;
-
-    qemu_co_queue_next(&s->compress_wait_queue);
+    qcow2_co_process(bs, qcow2_compress_pool_func, &arg);
 
     return arg.ret;
 }
diff --git a/block/qcow2.c b/block/qcow2.c
index e61dc54fd0..c4b716d4f6 100644
--- a/block/qcow2.c
+++ b/block/qcow2.c
@@ -1600,7 +1600,7 @@  static int coroutine_fn qcow2_do_open(BlockDriverState *bs, QDict *options,
     }
 #endif
 
-    qemu_co_queue_init(&s->compress_wait_queue);
+    qemu_co_queue_init(&s->thread_task_queue);
 
     return ret;