diff mbox series

[v2,10/17] migration/multifd: Convert multifd_send()::next_channel to atomic

Message ID 76dc3ad69fa457fd1e358ad3de874474f9f64716.1724701542.git.maciej.szmigiero@oracle.com (mailing list archive)
State New
Headers show
Series Multifd | expand

Commit Message

Maciej S. Szmigiero Aug. 27, 2024, 5:54 p.m. UTC
From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>

This is necessary for multifd_send() to be able to be called
from multiple threads.

Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
 migration/multifd.c | 24 ++++++++++++++++++------
 1 file changed, 18 insertions(+), 6 deletions(-)

Comments

Fabiano Rosas Aug. 30, 2024, 6:13 p.m. UTC | #1
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:

> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> This is necessary for multifd_send() to be able to be called
> from multiple threads.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
>  migration/multifd.c | 24 ++++++++++++++++++------
>  1 file changed, 18 insertions(+), 6 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index d5a8e5a9c9b5..b25789dde0b3 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -343,26 +343,38 @@ bool multifd_send(MultiFDSendData **send_data)
>          return false;
>      }
>  
> -    /* We wait here, until at least one channel is ready */
> -    qemu_sem_wait(&multifd_send_state->channels_ready);
> -
>      /*
>       * next_channel can remain from a previous migration that was
>       * using more channels, so ensure it doesn't overflow if the
>       * limit is lower now.
>       */
> -    next_channel %= migrate_multifd_channels();
> -    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
> +    i = qatomic_load_acquire(&next_channel);
> +    if (unlikely(i >= migrate_multifd_channels())) {
> +        qatomic_cmpxchg(&next_channel, i, 0);
> +    }

Do we still need this? It seems not, because the mod down below would
already truncate to a value less than the number of channels. We don't
need it to start at 0 always, the channels are equivalent.

> +
> +    /* We wait here, until at least one channel is ready */
> +    qemu_sem_wait(&multifd_send_state->channels_ready);
> +
> +    while (true) {
> +        int i_next;
> +
>          if (multifd_send_should_exit()) {
>              return false;
>          }
> +
> +        i = qatomic_load_acquire(&next_channel);
> +        i_next = (i + 1) % migrate_multifd_channels();
> +        if (qatomic_cmpxchg(&next_channel, i, i_next) != i) {
> +            continue;
> +        }

Say channel 'i' is the only one that's idle. What's stopping the other
thread(s) to race at this point and loop around to the same index?

> +
>          p = &multifd_send_state->params[i];
>          /*
>           * Lockless read to p->pending_job is safe, because only multifd
>           * sender thread can clear it.
>           */
>          if (qatomic_read(&p->pending_job) == false) {

With the cmpxchg your other patch adds here, then the race I mentioned
above should be harmless. But we'd need to bring that code into this
patch.

> -            next_channel = (i + 1) % migrate_multifd_channels();
>              break;
>          }
>      }
Maciej S. Szmigiero Sept. 2, 2024, 8:11 p.m. UTC | #2
On 30.08.2024 20:13, Fabiano Rosas wrote:
> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
> 
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> This is necessary for multifd_send() to be able to be called
>> from multiple threads.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>>   migration/multifd.c | 24 ++++++++++++++++++------
>>   1 file changed, 18 insertions(+), 6 deletions(-)
>>
>> diff --git a/migration/multifd.c b/migration/multifd.c
>> index d5a8e5a9c9b5..b25789dde0b3 100644
>> --- a/migration/multifd.c
>> +++ b/migration/multifd.c
>> @@ -343,26 +343,38 @@ bool multifd_send(MultiFDSendData **send_data)
>>           return false;
>>       }
>>   
>> -    /* We wait here, until at least one channel is ready */
>> -    qemu_sem_wait(&multifd_send_state->channels_ready);
>> -
>>       /*
>>        * next_channel can remain from a previous migration that was
>>        * using more channels, so ensure it doesn't overflow if the
>>        * limit is lower now.
>>        */
>> -    next_channel %= migrate_multifd_channels();
>> -    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
>> +    i = qatomic_load_acquire(&next_channel);
>> +    if (unlikely(i >= migrate_multifd_channels())) {
>> +        qatomic_cmpxchg(&next_channel, i, 0);
>> +    }
> 
> Do we still need this? It seems not, because the mod down below would
> already truncate to a value less than the number of channels. We don't
> need it to start at 0 always, the channels are equivalent.

The "modulo" operation below forces i_next to be in the proper range,
not i.

If the qatomic_cmpxchg() ends up succeeding then we use the (now out of
bounds) i value to index multifd_send_state->params[].

>> +
>> +    /* We wait here, until at least one channel is ready */
>> +    qemu_sem_wait(&multifd_send_state->channels_ready);
>> +
>> +    while (true) {
>> +        int i_next;
>> +
>>           if (multifd_send_should_exit()) {
>>               return false;
>>           }
>> +
>> +        i = qatomic_load_acquire(&next_channel);
>> +        i_next = (i + 1) % migrate_multifd_channels();
>> +        if (qatomic_cmpxchg(&next_channel, i, i_next) != i) {
>> +            continue;
>> +        }
> 
> Say channel 'i' is the only one that's idle. What's stopping the other
> thread(s) to race at this point and loop around to the same index?

See the reply below.

>> +
>>           p = &multifd_send_state->params[i];
>>           /*
>>            * Lockless read to p->pending_job is safe, because only multifd
>>            * sender thread can clear it.
>>            */
>>           if (qatomic_read(&p->pending_job) == false) {
> 
> With the cmpxchg your other patch adds here, then the race I mentioned
> above should be harmless. But we'd need to bring that code into this
> patch.
> 

You're right - the sender code with this patch alone isn't thread safe
yet but this commit is only literally about "converting
multifd_send()::next_channel to atomic".

At the time of this patch there aren't any multifd_send() calls from
multiple threads, and the commit that introduces such possible call
site (multifd_queue_device_state()) also modifies multifd_send()
to be fully thread safe by introducing p->pending_job_preparing.

Thanks,
Maciej
Fabiano Rosas Sept. 3, 2024, 3:01 p.m. UTC | #3
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:

> On 30.08.2024 20:13, Fabiano Rosas wrote:
>> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
>> 
>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>
>>> This is necessary for multifd_send() to be able to be called
>>> from multiple threads.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>> ---
>>>   migration/multifd.c | 24 ++++++++++++++++++------
>>>   1 file changed, 18 insertions(+), 6 deletions(-)
>>>
>>> diff --git a/migration/multifd.c b/migration/multifd.c
>>> index d5a8e5a9c9b5..b25789dde0b3 100644
>>> --- a/migration/multifd.c
>>> +++ b/migration/multifd.c
>>> @@ -343,26 +343,38 @@ bool multifd_send(MultiFDSendData **send_data)
>>>           return false;
>>>       }
>>>   
>>> -    /* We wait here, until at least one channel is ready */
>>> -    qemu_sem_wait(&multifd_send_state->channels_ready);
>>> -
>>>       /*
>>>        * next_channel can remain from a previous migration that was
>>>        * using more channels, so ensure it doesn't overflow if the
>>>        * limit is lower now.
>>>        */
>>> -    next_channel %= migrate_multifd_channels();
>>> -    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
>>> +    i = qatomic_load_acquire(&next_channel);
>>> +    if (unlikely(i >= migrate_multifd_channels())) {
>>> +        qatomic_cmpxchg(&next_channel, i, 0);
>>> +    }
>> 
>> Do we still need this? It seems not, because the mod down below would
>> already truncate to a value less than the number of channels. We don't
>> need it to start at 0 always, the channels are equivalent.
>
> The "modulo" operation below forces i_next to be in the proper range,
> not i.
>
> If the qatomic_cmpxchg() ends up succeeding then we use the (now out of
> bounds) i value to index multifd_send_state->params[].

Indeed.

>
>>> +
>>> +    /* We wait here, until at least one channel is ready */
>>> +    qemu_sem_wait(&multifd_send_state->channels_ready);
>>> +
>>> +    while (true) {
>>> +        int i_next;
>>> +
>>>           if (multifd_send_should_exit()) {
>>>               return false;
>>>           }
>>> +
>>> +        i = qatomic_load_acquire(&next_channel);
>>> +        i_next = (i + 1) % migrate_multifd_channels();
>>> +        if (qatomic_cmpxchg(&next_channel, i, i_next) != i) {
>>> +            continue;
>>> +        }
>> 
>> Say channel 'i' is the only one that's idle. What's stopping the other
>> thread(s) to race at this point and loop around to the same index?
>
> See the reply below.
>
>>> +
>>>           p = &multifd_send_state->params[i];
>>>           /*
>>>            * Lockless read to p->pending_job is safe, because only multifd
>>>            * sender thread can clear it.
>>>            */
>>>           if (qatomic_read(&p->pending_job) == false) {
>> 
>> With the cmpxchg your other patch adds here, then the race I mentioned
>> above should be harmless. But we'd need to bring that code into this
>> patch.
>> 
>
> You're right - the sender code with this patch alone isn't thread safe
> yet but this commit is only literally about "converting
> multifd_send()::next_channel to atomic".
>
> At the time of this patch there aren't any multifd_send() calls from
> multiple threads, and the commit that introduces such possible call
> site (multifd_queue_device_state()) also modifies multifd_send()
> to be fully thread safe by introducing p->pending_job_preparing.

In general this would be a bad practice because this commit can end up
being moved around due to backporting or bisecting. It would be better
if it were complete from the start. It also affects backporting due to
ambiguity on where the Fixes tag should point to if someone eventually
finds a bug.

I already asked you to extract the other code into a separate patch, so
it's not that bad. If you prefer to keep both changes separate for
clarity, please note on the commit message that the next patch is
necessary for correctness.

>
> Thanks,
> Maciej
Maciej S. Szmigiero Sept. 3, 2024, 8:04 p.m. UTC | #4
On 3.09.2024 17:01, Fabiano Rosas wrote:
> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
> 
>> On 30.08.2024 20:13, Fabiano Rosas wrote:
>>> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
>>>
>>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>>
>>>> This is necessary for multifd_send() to be able to be called
>>>> from multiple threads.
>>>>
>>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>>> ---
>>>>    migration/multifd.c | 24 ++++++++++++++++++------
>>>>    1 file changed, 18 insertions(+), 6 deletions(-)
>>>>
>>>> diff --git a/migration/multifd.c b/migration/multifd.c
>>>> index d5a8e5a9c9b5..b25789dde0b3 100644
>>>> --- a/migration/multifd.c
>>>> +++ b/migration/multifd.c
(..)
>>>> +
>>>> +    /* We wait here, until at least one channel is ready */
>>>> +    qemu_sem_wait(&multifd_send_state->channels_ready);
>>>> +
>>>> +    while (true) {
>>>> +        int i_next;
>>>> +
>>>>            if (multifd_send_should_exit()) {
>>>>                return false;
>>>>            }
>>>> +
>>>> +        i = qatomic_load_acquire(&next_channel);
>>>> +        i_next = (i + 1) % migrate_multifd_channels();
>>>> +        if (qatomic_cmpxchg(&next_channel, i, i_next) != i) {
>>>> +            continue;
>>>> +        }
>>>
>>> Say channel 'i' is the only one that's idle. What's stopping the other
>>> thread(s) to race at this point and loop around to the same index?
>>
>> See the reply below.
>>
>>>> +
>>>>            p = &multifd_send_state->params[i];
>>>>            /*
>>>>             * Lockless read to p->pending_job is safe, because only multifd
>>>>             * sender thread can clear it.
>>>>             */
>>>>            if (qatomic_read(&p->pending_job) == false) {
>>>
>>> With the cmpxchg your other patch adds here, then the race I mentioned
>>> above should be harmless. But we'd need to bring that code into this
>>> patch.
>>>
>>
>> You're right - the sender code with this patch alone isn't thread safe
>> yet but this commit is only literally about "converting
>> multifd_send()::next_channel to atomic".
>>
>> At the time of this patch there aren't any multifd_send() calls from
>> multiple threads, and the commit that introduces such possible call
>> site (multifd_queue_device_state()) also modifies multifd_send()
>> to be fully thread safe by introducing p->pending_job_preparing.
> 
> In general this would be a bad practice because this commit can end up
> being moved around due to backporting or bisecting. It would be better
> if it were complete from the start. It also affects backporting due to
> ambiguity on where the Fixes tag should point to if someone eventually
> finds a bug.
> 
> I already asked you to extract the other code into a separate patch, so
> it's not that bad. If you prefer to keep both changes separate for
> clarity, please note on the commit message that the next patch is
> necessary for correctness.
> 

If someone picks parts of a patch set or reorders commits then I guess
in many cases things can break indeed.

But it looks like I will be able to move code changes around to have
multifd_send() already thread safe by the time of this commit so I
will do that.

Thanks,
Maciej
Peter Xu Sept. 10, 2024, 2:13 p.m. UTC | #5
On Tue, Aug 27, 2024 at 07:54:29PM +0200, Maciej S. Szmigiero wrote:
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
> 
> This is necessary for multifd_send() to be able to be called
> from multiple threads.
> 
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>

Would it be much simpler to just use a mutex for enqueue?

Something like:

===8<===
diff --git a/migration/multifd.c b/migration/multifd.c
index 9b200f4ad9..979c9748b5 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -69,6 +69,8 @@ struct {
     QemuSemaphore channels_created;
     /* send channels ready */
     QemuSemaphore channels_ready;
+    /* Mutex to serialize multifd enqueues */
+    QemuMutex multifd_send_mutex;
     /*
      * Have we already run terminate threads.  There is a race when it
      * happens that we got one error while we are exiting.
@@ -305,6 +307,8 @@ bool multifd_send(MultiFDSendData **send_data)
     MultiFDSendParams *p = NULL; /* make happy gcc */
     MultiFDSendData *tmp;
 
+    QEMU_LOCK_GUARD(&multifd_send_mutex);
+
     if (multifd_send_should_exit()) {
         return false;
     }
@@ -824,6 +828,7 @@ bool multifd_send_setup(void)
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
     qemu_sem_init(&multifd_send_state->channels_created, 0);
     qemu_sem_init(&multifd_send_state->channels_ready, 0);
+    qemu_mutex_init(&multifd_send_state->multifd_send_mutex);
     qatomic_set(&multifd_send_state->exiting, 0);
     multifd_send_state->ops = multifd_ops[migrate_multifd_compression()];
===8<===

Then all the details doesn't need change (meanwhile the perf should be
similar)?
diff mbox series

Patch

diff --git a/migration/multifd.c b/migration/multifd.c
index d5a8e5a9c9b5..b25789dde0b3 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -343,26 +343,38 @@  bool multifd_send(MultiFDSendData **send_data)
         return false;
     }
 
-    /* We wait here, until at least one channel is ready */
-    qemu_sem_wait(&multifd_send_state->channels_ready);
-
     /*
      * next_channel can remain from a previous migration that was
      * using more channels, so ensure it doesn't overflow if the
      * limit is lower now.
      */
-    next_channel %= migrate_multifd_channels();
-    for (i = next_channel;; i = (i + 1) % migrate_multifd_channels()) {
+    i = qatomic_load_acquire(&next_channel);
+    if (unlikely(i >= migrate_multifd_channels())) {
+        qatomic_cmpxchg(&next_channel, i, 0);
+    }
+
+    /* We wait here, until at least one channel is ready */
+    qemu_sem_wait(&multifd_send_state->channels_ready);
+
+    while (true) {
+        int i_next;
+
         if (multifd_send_should_exit()) {
             return false;
         }
+
+        i = qatomic_load_acquire(&next_channel);
+        i_next = (i + 1) % migrate_multifd_channels();
+        if (qatomic_cmpxchg(&next_channel, i, i_next) != i) {
+            continue;
+        }
+
         p = &multifd_send_state->params[i];
         /*
          * Lockless read to p->pending_job is safe, because only multifd
          * sender thread can clear it.
          */
         if (qatomic_read(&p->pending_job) == false) {
-            next_channel = (i + 1) % migrate_multifd_channels();
             break;
         }
     }