diff mbox series

[v7,2/5] xen/rcu: don't use stop_machine_run() for rcu_barrier()

Message ID 20200325105511.20882-3-jgross@suse.com (mailing list archive)
State Superseded
Headers show
Series xen/rcu: let rcu work better with core scheduling | expand

Commit Message

Jürgen Groß March 25, 2020, 10:55 a.m. UTC
Today rcu_barrier() is calling stop_machine_run() to synchronize all
physical cpus in order to ensure all pending rcu calls have finished
when returning.

As stop_machine_run() is using tasklets this requires scheduling of
idle vcpus on all cpus imposing the need to call rcu_barrier() on idle
cpus only in case of core scheduling being active, as otherwise a
scheduling deadlock would occur.

There is no need at all to do the syncing of the cpus in tasklets, as
rcu activity is started in __do_softirq() called whenever softirq
activity is allowed. So rcu_barrier() can easily be modified to use
softirq for synchronization of the cpus no longer requiring any
scheduling activity.

As there already is a rcu softirq reuse that for the synchronization.

Remove the barrier element from struct rcu_data as it isn't used.

Finally switch rcu_barrier() to return void as it now can never fail.

Partially-based-on-patch-by: Igor Druzhinin <igor.druzhinin@citrix.com>
Signed-off-by: Juergen Gross <jgross@suse.com>
---
V2:
- add recursion detection

V3:
- fix races (Igor Druzhinin)

V5:
- rename done_count to pending_count (Jan Beulich)
- fix race (Jan Beulich)

V6:
- add barrier (Julien Grall)
- add ASSERT() (Julien Grall)
- hold cpu_map lock until end of rcu_barrier() (Julien Grall)

V7:
- update comment (Jan Beulich)
- add barriers (Jan Beulich)
---
 xen/common/rcupdate.c      | 100 +++++++++++++++++++++++++++++++++------------
 xen/include/xen/rcupdate.h |   2 +-
 2 files changed, 74 insertions(+), 28 deletions(-)

Comments

Jan Beulich March 25, 2020, 1:19 p.m. UTC | #1
On 25.03.2020 11:55, Juergen Gross wrote:
> Today rcu_barrier() is calling stop_machine_run() to synchronize all
> physical cpus in order to ensure all pending rcu calls have finished
> when returning.
> 
> As stop_machine_run() is using tasklets this requires scheduling of
> idle vcpus on all cpus imposing the need to call rcu_barrier() on idle
> cpus only in case of core scheduling being active, as otherwise a
> scheduling deadlock would occur.
> 
> There is no need at all to do the syncing of the cpus in tasklets, as
> rcu activity is started in __do_softirq() called whenever softirq
> activity is allowed. So rcu_barrier() can easily be modified to use
> softirq for synchronization of the cpus no longer requiring any
> scheduling activity.
> 
> As there already is a rcu softirq reuse that for the synchronization.
> 
> Remove the barrier element from struct rcu_data as it isn't used.
> 
> Finally switch rcu_barrier() to return void as it now can never fail.
> 
> Partially-based-on-patch-by: Igor Druzhinin <igor.druzhinin@citrix.com>
> Signed-off-by: Juergen Gross <jgross@suse.com>

Reviewed-by: Jan Beulich <jbeulich@suse.com>
Julien Grall March 25, 2020, 4:13 p.m. UTC | #2
Hi Juergen,

On 25/03/2020 10:55, Juergen Gross wrote:
> Today rcu_barrier() is calling stop_machine_run() to synchronize all
> physical cpus in order to ensure all pending rcu calls have finished
> when returning.
> 
> As stop_machine_run() is using tasklets this requires scheduling of
> idle vcpus on all cpus imposing the need to call rcu_barrier() on idle
> cpus only in case of core scheduling being active, as otherwise a
> scheduling deadlock would occur.
> 
> There is no need at all to do the syncing of the cpus in tasklets, as
> rcu activity is started in __do_softirq() called whenever softirq
> activity is allowed. So rcu_barrier() can easily be modified to use
> softirq for synchronization of the cpus no longer requiring any
> scheduling activity.
> 
> As there already is a rcu softirq reuse that for the synchronization.
> 
> Remove the barrier element from struct rcu_data as it isn't used.
> 
> Finally switch rcu_barrier() to return void as it now can never fail.
> 
> Partially-based-on-patch-by: Igor Druzhinin <igor.druzhinin@citrix.com>
> Signed-off-by: Juergen Gross <jgross@suse.com>
> ---
> V2:
> - add recursion detection
> 
> V3:
> - fix races (Igor Druzhinin)
> 
> V5:
> - rename done_count to pending_count (Jan Beulich)
> - fix race (Jan Beulich)
> 
> V6:
> - add barrier (Julien Grall)
> - add ASSERT() (Julien Grall)
> - hold cpu_map lock until end of rcu_barrier() (Julien Grall)
> 
> V7:
> - update comment (Jan Beulich)
> - add barriers (Jan Beulich)
> ---
>   xen/common/rcupdate.c      | 100 +++++++++++++++++++++++++++++++++------------
>   xen/include/xen/rcupdate.h |   2 +-
>   2 files changed, 74 insertions(+), 28 deletions(-)
> 
> diff --git a/xen/common/rcupdate.c b/xen/common/rcupdate.c
> index 03d84764d2..12b89565d0 100644
> --- a/xen/common/rcupdate.c
> +++ b/xen/common/rcupdate.c
> @@ -83,7 +83,6 @@ struct rcu_data {
>       struct rcu_head **donetail;
>       long            blimit;           /* Upper limit on a processed batch */
>       int cpu;
> -    struct rcu_head barrier;
>       long            last_rs_qlen;     /* qlen during the last resched */
>   
>       /* 3) idle CPUs handling */
> @@ -91,6 +90,7 @@ struct rcu_data {
>       bool idle_timer_active;
>   
>       bool            process_callbacks;
> +    bool            barrier_active;
>   };
>   
>   /*
> @@ -143,51 +143,90 @@ static int qhimark = 10000;
>   static int qlowmark = 100;
>   static int rsinterval = 1000;
>   
> -struct rcu_barrier_data {
> -    struct rcu_head head;
> -    atomic_t *cpu_count;
> -};
> +/*
> + * rcu_barrier() handling:
> + * Two counters are used to synchronize rcu_barrier() work:
> + * - cpu_count holds the number of cpus required to finish barrier handling.
> + *   It is decremented by each cpu when it has performed all pending rcu calls.
> + * - pending_count shows whether any rcu_barrier() activity is running and
> + *   it is used to synchronize leaving rcu_barrier() only after all cpus
> + *   have finished their processing. pending_count is initialized to nr_cpus + 1
> + *   and it is decremented by each cpu when it has seen that cpu_count has
> + *   reached 0. The cpu where rcu_barrier() has been called will wait until
> + *   pending_count has been decremented to 1 (so all cpus have seen cpu_count
> + *   reaching 0) and will then set pending_count to 0 indicating there is no
> + *   rcu_barrier() running.
> + * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
> + * be active if pending_count is not zero. In case rcu_barrier() is called on
> + * multiple cpus it is enough to check for pending_count being not zero on entry
> + * and to call process_pending_softirqs() in a loop until pending_count drops to
> + * zero, before starting the new rcu_barrier() processing.
> + */
> +static atomic_t cpu_count = ATOMIC_INIT(0);
> +static atomic_t pending_count = ATOMIC_INIT(0);
>   
>   static void rcu_barrier_callback(struct rcu_head *head)
>   {
> -    struct rcu_barrier_data *data = container_of(
> -        head, struct rcu_barrier_data, head);
> -    atomic_inc(data->cpu_count);
> +    smp_mb__before_atomic();     /* Make all writes visible to other cpus. */

smp_mb__before_atomic() will order both read and write. However, the 
comment suggest only the write are required to be ordered.

So either the barrier is too strong or the comment is incorrect. Can you 
clarify it?

> +    atomic_dec(&cpu_count);
>   }
>   
> -static int rcu_barrier_action(void *_cpu_count)
> +static void rcu_barrier_action(void)
>   {
> -    struct rcu_barrier_data data = { .cpu_count = _cpu_count };
> -
> -    ASSERT(!local_irq_is_enabled());
> -    local_irq_enable();
> +    struct rcu_head head;
>   
>       /*
>        * When callback is executed, all previously-queued RCU work on this CPU
> -     * is completed. When all CPUs have executed their callback, data.cpu_count
> -     * will have been incremented to include every online CPU.
> +     * is completed. When all CPUs have executed their callback, cpu_count
> +     * will have been decremented to 0.
>        */
> -    call_rcu(&data.head, rcu_barrier_callback);
> +    call_rcu(&head, rcu_barrier_callback);
>   
> -    while ( atomic_read(data.cpu_count) != num_online_cpus() )
> +    while ( atomic_read(&cpu_count) )
>       {
>           process_pending_softirqs();
>           cpu_relax();
>       }
>   
> -    local_irq_disable();
> -
> -    return 0;
> +    smp_mb__before_atomic();
> +    atomic_dec(&pending_count);
>   }
>   
> -/*
> - * As rcu_barrier() is using stop_machine_run() it is allowed to be used in
> - * idle context only (see comment for stop_machine_run()).
> - */
> -int rcu_barrier(void)
> +void rcu_barrier(void)
>   {
> -    atomic_t cpu_count = ATOMIC_INIT(0);
> -    return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
> +    unsigned int n_cpus;
> +
> +    ASSERT(!in_irq() && local_irq_is_enabled());
> +
> +    for ( ; ; )
> +    {
> +        if ( !atomic_read(&pending_count) && get_cpu_maps() )
> +        {
> +            n_cpus = num_online_cpus();
> +
> +            if ( atomic_cmpxchg(&pending_count, 0, n_cpus + 1) == 0 )
> +                break;
> +
> +            put_cpu_maps();
> +        }
> +
> +        process_pending_softirqs();
> +        cpu_relax();
> +    }
> +
> +    smp_mb__before_atomic();

Our semantic of atomic_cmpxchg() is exactly the same as Linux. I.e it 
will contain a full barrier when the cmpxchg succeed. So why do you need 
this barrier?

> +    atomic_set(&cpu_count, n_cpus);
> +    cpumask_raise_softirq(&cpu_online_map, RCU_SOFTIRQ);
> +
> +    while ( atomic_read(&pending_count) != 1 )
> +    {
> +        process_pending_softirqs();
> +        cpu_relax();
> +    }
> +
> +    atomic_set(&pending_count, 0);
> +
> +    put_cpu_maps();
>   }
>   
>   /* Is batch a before batch b ? */
> @@ -426,6 +465,13 @@ static void rcu_process_callbacks(void)
>           rdp->process_callbacks = false;
>           __rcu_process_callbacks(&rcu_ctrlblk, rdp);
>       }
> +
> +    if ( atomic_read(&cpu_count) && !rdp->barrier_active )
> +    {
> +        rdp->barrier_active = true;
> +        rcu_barrier_action();
> +        rdp->barrier_active = false;
> +    }
>   }
>   
>   static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> diff --git a/xen/include/xen/rcupdate.h b/xen/include/xen/rcupdate.h
> index eb9b60df07..31c8b86d13 100644
> --- a/xen/include/xen/rcupdate.h
> +++ b/xen/include/xen/rcupdate.h
> @@ -144,7 +144,7 @@ void rcu_check_callbacks(int cpu);
>   void call_rcu(struct rcu_head *head,
>                 void (*func)(struct rcu_head *head));
>   
> -int rcu_barrier(void);
> +void rcu_barrier(void);
>   
>   void rcu_idle_enter(unsigned int cpu);
>   void rcu_idle_exit(unsigned int cpu);
> 

Cheers,
Jan Beulich March 26, 2020, 6:58 a.m. UTC | #3
On 25.03.2020 17:13, Julien Grall wrote:
> On 25/03/2020 10:55, Juergen Gross wrote:
>> @@ -143,51 +143,90 @@ static int qhimark = 10000;
>>   static int qlowmark = 100;
>>   static int rsinterval = 1000;
>>   -struct rcu_barrier_data {
>> -    struct rcu_head head;
>> -    atomic_t *cpu_count;
>> -};
>> +/*
>> + * rcu_barrier() handling:
>> + * Two counters are used to synchronize rcu_barrier() work:
>> + * - cpu_count holds the number of cpus required to finish barrier handling.
>> + *   It is decremented by each cpu when it has performed all pending rcu calls.
>> + * - pending_count shows whether any rcu_barrier() activity is running and
>> + *   it is used to synchronize leaving rcu_barrier() only after all cpus
>> + *   have finished their processing. pending_count is initialized to nr_cpus + 1
>> + *   and it is decremented by each cpu when it has seen that cpu_count has
>> + *   reached 0. The cpu where rcu_barrier() has been called will wait until
>> + *   pending_count has been decremented to 1 (so all cpus have seen cpu_count
>> + *   reaching 0) and will then set pending_count to 0 indicating there is no
>> + *   rcu_barrier() running.
>> + * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
>> + * be active if pending_count is not zero. In case rcu_barrier() is called on
>> + * multiple cpus it is enough to check for pending_count being not zero on entry
>> + * and to call process_pending_softirqs() in a loop until pending_count drops to
>> + * zero, before starting the new rcu_barrier() processing.
>> + */
>> +static atomic_t cpu_count = ATOMIC_INIT(0);
>> +static atomic_t pending_count = ATOMIC_INIT(0);
>>     static void rcu_barrier_callback(struct rcu_head *head)
>>   {
>> -    struct rcu_barrier_data *data = container_of(
>> -        head, struct rcu_barrier_data, head);
>> -    atomic_inc(data->cpu_count);
>> +    smp_mb__before_atomic();     /* Make all writes visible to other cpus. */
> 
> smp_mb__before_atomic() will order both read and write. However, the
> comment suggest only the write are required to be ordered.
> 
> So either the barrier is too strong or the comment is incorrect. Can
> you clarify it?

Neither is the case, I guess: There simply is no smp_wmb__before_atomic()
in Linux, and if we want to follow their model we shouldn't have one
either. I'd rather take the comment to indicate that if one appeared, it
could be used here.

>> +    atomic_dec(&cpu_count);
>>   }
>>   -static int rcu_barrier_action(void *_cpu_count)
>> +static void rcu_barrier_action(void)
>>   {
>> -    struct rcu_barrier_data data = { .cpu_count = _cpu_count };
>> -
>> -    ASSERT(!local_irq_is_enabled());
>> -    local_irq_enable();
>> +    struct rcu_head head;
>>         /*
>>        * When callback is executed, all previously-queued RCU work on this CPU
>> -     * is completed. When all CPUs have executed their callback, data.cpu_count
>> -     * will have been incremented to include every online CPU.
>> +     * is completed. When all CPUs have executed their callback, cpu_count
>> +     * will have been decremented to 0.
>>        */
>> -    call_rcu(&data.head, rcu_barrier_callback);
>> +    call_rcu(&head, rcu_barrier_callback);
>>   -    while ( atomic_read(data.cpu_count) != num_online_cpus() )
>> +    while ( atomic_read(&cpu_count) )
>>       {
>>           process_pending_softirqs();
>>           cpu_relax();
>>       }
>>   -    local_irq_disable();
>> -
>> -    return 0;
>> +    smp_mb__before_atomic();
>> +    atomic_dec(&pending_count);
>>   }
>>   -/*
>> - * As rcu_barrier() is using stop_machine_run() it is allowed to be used in
>> - * idle context only (see comment for stop_machine_run()).
>> - */
>> -int rcu_barrier(void)
>> +void rcu_barrier(void)
>>   {
>> -    atomic_t cpu_count = ATOMIC_INIT(0);
>> -    return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
>> +    unsigned int n_cpus;
>> +
>> +    ASSERT(!in_irq() && local_irq_is_enabled());
>> +
>> +    for ( ; ; )
>> +    {
>> +        if ( !atomic_read(&pending_count) && get_cpu_maps() )
>> +        {
>> +            n_cpus = num_online_cpus();
>> +
>> +            if ( atomic_cmpxchg(&pending_count, 0, n_cpus + 1) == 0 )
>> +                break;
>> +
>> +            put_cpu_maps();
>> +        }
>> +
>> +        process_pending_softirqs();
>> +        cpu_relax();
>> +    }
>> +
>> +    smp_mb__before_atomic();
> 
> Our semantic of atomic_cmpxchg() is exactly the same as Linux. I.e
> it will contain a full barrier when the cmpxchg succeed. So why do you need this barrier?

I was me I think to have (wrongly) suggested a barrier was missing
here, sorry.

Jan
Jürgen Groß March 26, 2020, 7:24 a.m. UTC | #4
On 26.03.20 07:58, Jan Beulich wrote:
> On 25.03.2020 17:13, Julien Grall wrote:
>> On 25/03/2020 10:55, Juergen Gross wrote:
>>> @@ -143,51 +143,90 @@ static int qhimark = 10000;
>>>    static int qlowmark = 100;
>>>    static int rsinterval = 1000;
>>>    -struct rcu_barrier_data {
>>> -    struct rcu_head head;
>>> -    atomic_t *cpu_count;
>>> -};
>>> +/*
>>> + * rcu_barrier() handling:
>>> + * Two counters are used to synchronize rcu_barrier() work:
>>> + * - cpu_count holds the number of cpus required to finish barrier handling.
>>> + *   It is decremented by each cpu when it has performed all pending rcu calls.
>>> + * - pending_count shows whether any rcu_barrier() activity is running and
>>> + *   it is used to synchronize leaving rcu_barrier() only after all cpus
>>> + *   have finished their processing. pending_count is initialized to nr_cpus + 1
>>> + *   and it is decremented by each cpu when it has seen that cpu_count has
>>> + *   reached 0. The cpu where rcu_barrier() has been called will wait until
>>> + *   pending_count has been decremented to 1 (so all cpus have seen cpu_count
>>> + *   reaching 0) and will then set pending_count to 0 indicating there is no
>>> + *   rcu_barrier() running.
>>> + * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
>>> + * be active if pending_count is not zero. In case rcu_barrier() is called on
>>> + * multiple cpus it is enough to check for pending_count being not zero on entry
>>> + * and to call process_pending_softirqs() in a loop until pending_count drops to
>>> + * zero, before starting the new rcu_barrier() processing.
>>> + */
>>> +static atomic_t cpu_count = ATOMIC_INIT(0);
>>> +static atomic_t pending_count = ATOMIC_INIT(0);
>>>      static void rcu_barrier_callback(struct rcu_head *head)
>>>    {
>>> -    struct rcu_barrier_data *data = container_of(
>>> -        head, struct rcu_barrier_data, head);
>>> -    atomic_inc(data->cpu_count);
>>> +    smp_mb__before_atomic();     /* Make all writes visible to other cpus. */
>>
>> smp_mb__before_atomic() will order both read and write. However, the
>> comment suggest only the write are required to be ordered.
>>
>> So either the barrier is too strong or the comment is incorrect. Can
>> you clarify it?
> 
> Neither is the case, I guess: There simply is no smp_wmb__before_atomic()
> in Linux, and if we want to follow their model we shouldn't have one
> either. I'd rather take the comment to indicate that if one appeared, it
> could be used here.

Right. Currently we have the choice of either using
smp_mb__before_atomic() which is too strong for Arm, or smp_wmb() which
is too strong for x86.

> 
>>> +    atomic_dec(&cpu_count);
>>>    }
>>>    -static int rcu_barrier_action(void *_cpu_count)
>>> +static void rcu_barrier_action(void)
>>>    {
>>> -    struct rcu_barrier_data data = { .cpu_count = _cpu_count };
>>> -
>>> -    ASSERT(!local_irq_is_enabled());
>>> -    local_irq_enable();
>>> +    struct rcu_head head;
>>>          /*
>>>         * When callback is executed, all previously-queued RCU work on this CPU
>>> -     * is completed. When all CPUs have executed their callback, data.cpu_count
>>> -     * will have been incremented to include every online CPU.
>>> +     * is completed. When all CPUs have executed their callback, cpu_count
>>> +     * will have been decremented to 0.
>>>         */
>>> -    call_rcu(&data.head, rcu_barrier_callback);
>>> +    call_rcu(&head, rcu_barrier_callback);
>>>    -    while ( atomic_read(data.cpu_count) != num_online_cpus() )
>>> +    while ( atomic_read(&cpu_count) )
>>>        {
>>>            process_pending_softirqs();
>>>            cpu_relax();
>>>        }
>>>    -    local_irq_disable();
>>> -
>>> -    return 0;
>>> +    smp_mb__before_atomic();
>>> +    atomic_dec(&pending_count);
>>>    }
>>>    -/*
>>> - * As rcu_barrier() is using stop_machine_run() it is allowed to be used in
>>> - * idle context only (see comment for stop_machine_run()).
>>> - */
>>> -int rcu_barrier(void)
>>> +void rcu_barrier(void)
>>>    {
>>> -    atomic_t cpu_count = ATOMIC_INIT(0);
>>> -    return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
>>> +    unsigned int n_cpus;
>>> +
>>> +    ASSERT(!in_irq() && local_irq_is_enabled());
>>> +
>>> +    for ( ; ; )
>>> +    {
>>> +        if ( !atomic_read(&pending_count) && get_cpu_maps() )
>>> +        {
>>> +            n_cpus = num_online_cpus();
>>> +
>>> +            if ( atomic_cmpxchg(&pending_count, 0, n_cpus + 1) == 0 )
>>> +                break;
>>> +
>>> +            put_cpu_maps();
>>> +        }
>>> +
>>> +        process_pending_softirqs();
>>> +        cpu_relax();
>>> +    }
>>> +
>>> +    smp_mb__before_atomic();
>>
>> Our semantic of atomic_cmpxchg() is exactly the same as Linux. I.e
>> it will contain a full barrier when the cmpxchg succeed. So why do you need this barrier?
> 
> I was me I think to have (wrongly) suggested a barrier was missing
> here, sorry.

I'll update the patch dropping the barrier.


Juergen
Jan Beulich March 26, 2020, 8:49 a.m. UTC | #5
On 26.03.2020 08:24, Jürgen Groß wrote:
> On 26.03.20 07:58, Jan Beulich wrote:
>> On 25.03.2020 17:13, Julien Grall wrote:
>>> On 25/03/2020 10:55, Juergen Gross wrote:
>>>> @@ -143,51 +143,90 @@ static int qhimark = 10000;
>>>>    static int qlowmark = 100;
>>>>    static int rsinterval = 1000;
>>>>    -struct rcu_barrier_data {
>>>> -    struct rcu_head head;
>>>> -    atomic_t *cpu_count;
>>>> -};
>>>> +/*
>>>> + * rcu_barrier() handling:
>>>> + * Two counters are used to synchronize rcu_barrier() work:
>>>> + * - cpu_count holds the number of cpus required to finish barrier handling.
>>>> + *   It is decremented by each cpu when it has performed all pending rcu calls.
>>>> + * - pending_count shows whether any rcu_barrier() activity is running and
>>>> + *   it is used to synchronize leaving rcu_barrier() only after all cpus
>>>> + *   have finished their processing. pending_count is initialized to nr_cpus + 1
>>>> + *   and it is decremented by each cpu when it has seen that cpu_count has
>>>> + *   reached 0. The cpu where rcu_barrier() has been called will wait until
>>>> + *   pending_count has been decremented to 1 (so all cpus have seen cpu_count
>>>> + *   reaching 0) and will then set pending_count to 0 indicating there is no
>>>> + *   rcu_barrier() running.
>>>> + * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
>>>> + * be active if pending_count is not zero. In case rcu_barrier() is called on
>>>> + * multiple cpus it is enough to check for pending_count being not zero on entry
>>>> + * and to call process_pending_softirqs() in a loop until pending_count drops to
>>>> + * zero, before starting the new rcu_barrier() processing.
>>>> + */
>>>> +static atomic_t cpu_count = ATOMIC_INIT(0);
>>>> +static atomic_t pending_count = ATOMIC_INIT(0);
>>>>      static void rcu_barrier_callback(struct rcu_head *head)
>>>>    {
>>>> -    struct rcu_barrier_data *data = container_of(
>>>> -        head, struct rcu_barrier_data, head);
>>>> -    atomic_inc(data->cpu_count);
>>>> +    smp_mb__before_atomic();     /* Make all writes visible to other cpus. */
>>>
>>> smp_mb__before_atomic() will order both read and write. However, the
>>> comment suggest only the write are required to be ordered.
>>>
>>> So either the barrier is too strong or the comment is incorrect. Can
>>> you clarify it?
>>
>> Neither is the case, I guess: There simply is no smp_wmb__before_atomic()
>> in Linux, and if we want to follow their model we shouldn't have one
>> either. I'd rather take the comment to indicate that if one appeared, it
>> could be used here.
> 
> Right. Currently we have the choice of either using
> smp_mb__before_atomic() which is too strong for Arm, or smp_wmb() which
> is too strong for x86.

For x86 smp_wmb() is actually only very slightly too strong - it expands
to just barrier(), after all. So overall perhaps that's the better
choice here (with a suitable comment)?

Jan
Jürgen Groß March 26, 2020, 8:50 a.m. UTC | #6
On 26.03.20 09:49, Jan Beulich wrote:
> On 26.03.2020 08:24, Jürgen Groß wrote:
>> On 26.03.20 07:58, Jan Beulich wrote:
>>> On 25.03.2020 17:13, Julien Grall wrote:
>>>> On 25/03/2020 10:55, Juergen Gross wrote:
>>>>> @@ -143,51 +143,90 @@ static int qhimark = 10000;
>>>>>     static int qlowmark = 100;
>>>>>     static int rsinterval = 1000;
>>>>>     -struct rcu_barrier_data {
>>>>> -    struct rcu_head head;
>>>>> -    atomic_t *cpu_count;
>>>>> -};
>>>>> +/*
>>>>> + * rcu_barrier() handling:
>>>>> + * Two counters are used to synchronize rcu_barrier() work:
>>>>> + * - cpu_count holds the number of cpus required to finish barrier handling.
>>>>> + *   It is decremented by each cpu when it has performed all pending rcu calls.
>>>>> + * - pending_count shows whether any rcu_barrier() activity is running and
>>>>> + *   it is used to synchronize leaving rcu_barrier() only after all cpus
>>>>> + *   have finished their processing. pending_count is initialized to nr_cpus + 1
>>>>> + *   and it is decremented by each cpu when it has seen that cpu_count has
>>>>> + *   reached 0. The cpu where rcu_barrier() has been called will wait until
>>>>> + *   pending_count has been decremented to 1 (so all cpus have seen cpu_count
>>>>> + *   reaching 0) and will then set pending_count to 0 indicating there is no
>>>>> + *   rcu_barrier() running.
>>>>> + * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
>>>>> + * be active if pending_count is not zero. In case rcu_barrier() is called on
>>>>> + * multiple cpus it is enough to check for pending_count being not zero on entry
>>>>> + * and to call process_pending_softirqs() in a loop until pending_count drops to
>>>>> + * zero, before starting the new rcu_barrier() processing.
>>>>> + */
>>>>> +static atomic_t cpu_count = ATOMIC_INIT(0);
>>>>> +static atomic_t pending_count = ATOMIC_INIT(0);
>>>>>       static void rcu_barrier_callback(struct rcu_head *head)
>>>>>     {
>>>>> -    struct rcu_barrier_data *data = container_of(
>>>>> -        head, struct rcu_barrier_data, head);
>>>>> -    atomic_inc(data->cpu_count);
>>>>> +    smp_mb__before_atomic();     /* Make all writes visible to other cpus. */
>>>>
>>>> smp_mb__before_atomic() will order both read and write. However, the
>>>> comment suggest only the write are required to be ordered.
>>>>
>>>> So either the barrier is too strong or the comment is incorrect. Can
>>>> you clarify it?
>>>
>>> Neither is the case, I guess: There simply is no smp_wmb__before_atomic()
>>> in Linux, and if we want to follow their model we shouldn't have one
>>> either. I'd rather take the comment to indicate that if one appeared, it
>>> could be used here.
>>
>> Right. Currently we have the choice of either using
>> smp_mb__before_atomic() which is too strong for Arm, or smp_wmb() which
>> is too strong for x86.
> 
> For x86 smp_wmb() is actually only very slightly too strong - it expands
> to just barrier(), after all. So overall perhaps that's the better
> choice here (with a suitable comment)?

Fine with me.


Juergen
Julien Grall March 26, 2020, 9:14 a.m. UTC | #7
On 26/03/2020 08:50, Jürgen Groß wrote:
> On 26.03.20 09:49, Jan Beulich wrote:
>> On 26.03.2020 08:24, Jürgen Groß wrote:
>>> On 26.03.20 07:58, Jan Beulich wrote:
>>>> On 25.03.2020 17:13, Julien Grall wrote:
>>>>> On 25/03/2020 10:55, Juergen Gross wrote:
>>>>>> @@ -143,51 +143,90 @@ static int qhimark = 10000;
>>>>>>     static int qlowmark = 100;
>>>>>>     static int rsinterval = 1000;
>>>>>>     -struct rcu_barrier_data {
>>>>>> -    struct rcu_head head;
>>>>>> -    atomic_t *cpu_count;
>>>>>> -};
>>>>>> +/*
>>>>>> + * rcu_barrier() handling:
>>>>>> + * Two counters are used to synchronize rcu_barrier() work:
>>>>>> + * - cpu_count holds the number of cpus required to finish 
>>>>>> barrier handling.
>>>>>> + *   It is decremented by each cpu when it has performed all 
>>>>>> pending rcu calls.
>>>>>> + * - pending_count shows whether any rcu_barrier() activity is 
>>>>>> running and
>>>>>> + *   it is used to synchronize leaving rcu_barrier() only after 
>>>>>> all cpus
>>>>>> + *   have finished their processing. pending_count is initialized 
>>>>>> to nr_cpus + 1
>>>>>> + *   and it is decremented by each cpu when it has seen that 
>>>>>> cpu_count has
>>>>>> + *   reached 0. The cpu where rcu_barrier() has been called will 
>>>>>> wait until
>>>>>> + *   pending_count has been decremented to 1 (so all cpus have 
>>>>>> seen cpu_count
>>>>>> + *   reaching 0) and will then set pending_count to 0 indicating 
>>>>>> there is no
>>>>>> + *   rcu_barrier() running.
>>>>>> + * Cpus are synchronized via softirq mechanism. rcu_barrier() is 
>>>>>> regarded to
>>>>>> + * be active if pending_count is not zero. In case rcu_barrier() 
>>>>>> is called on
>>>>>> + * multiple cpus it is enough to check for pending_count being 
>>>>>> not zero on entry
>>>>>> + * and to call process_pending_softirqs() in a loop until 
>>>>>> pending_count drops to
>>>>>> + * zero, before starting the new rcu_barrier() processing.
>>>>>> + */
>>>>>> +static atomic_t cpu_count = ATOMIC_INIT(0);
>>>>>> +static atomic_t pending_count = ATOMIC_INIT(0);
>>>>>>       static void rcu_barrier_callback(struct rcu_head *head)
>>>>>>     {
>>>>>> -    struct rcu_barrier_data *data = container_of(
>>>>>> -        head, struct rcu_barrier_data, head);
>>>>>> -    atomic_inc(data->cpu_count);
>>>>>> +    smp_mb__before_atomic();     /* Make all writes visible to 
>>>>>> other cpus. */
>>>>>
>>>>> smp_mb__before_atomic() will order both read and write. However, the
>>>>> comment suggest only the write are required to be ordered.
>>>>>
>>>>> So either the barrier is too strong or the comment is incorrect. Can
>>>>> you clarify it?
>>>>
>>>> Neither is the case, I guess: There simply is no 
>>>> smp_wmb__before_atomic()
>>>> in Linux, and if we want to follow their model we shouldn't have one
>>>> either. I'd rather take the comment to indicate that if one 
>>>> appeared, it
>>>> could be used here.
>>>
>>> Right. Currently we have the choice of either using
>>> smp_mb__before_atomic() which is too strong for Arm, or smp_wmb() which
>>> is too strong for x86.
>>
>> For x86 smp_wmb() is actually only very slightly too strong - it expands
>> to just barrier(), after all. So overall perhaps that's the better
>> choice here (with a suitable comment)?
> 
> Fine with me.

I am happy with that.

Cheers,
diff mbox series

Patch

diff --git a/xen/common/rcupdate.c b/xen/common/rcupdate.c
index 03d84764d2..12b89565d0 100644
--- a/xen/common/rcupdate.c
+++ b/xen/common/rcupdate.c
@@ -83,7 +83,6 @@  struct rcu_data {
     struct rcu_head **donetail;
     long            blimit;           /* Upper limit on a processed batch */
     int cpu;
-    struct rcu_head barrier;
     long            last_rs_qlen;     /* qlen during the last resched */
 
     /* 3) idle CPUs handling */
@@ -91,6 +90,7 @@  struct rcu_data {
     bool idle_timer_active;
 
     bool            process_callbacks;
+    bool            barrier_active;
 };
 
 /*
@@ -143,51 +143,90 @@  static int qhimark = 10000;
 static int qlowmark = 100;
 static int rsinterval = 1000;
 
-struct rcu_barrier_data {
-    struct rcu_head head;
-    atomic_t *cpu_count;
-};
+/*
+ * rcu_barrier() handling:
+ * Two counters are used to synchronize rcu_barrier() work:
+ * - cpu_count holds the number of cpus required to finish barrier handling.
+ *   It is decremented by each cpu when it has performed all pending rcu calls.
+ * - pending_count shows whether any rcu_barrier() activity is running and
+ *   it is used to synchronize leaving rcu_barrier() only after all cpus
+ *   have finished their processing. pending_count is initialized to nr_cpus + 1
+ *   and it is decremented by each cpu when it has seen that cpu_count has
+ *   reached 0. The cpu where rcu_barrier() has been called will wait until
+ *   pending_count has been decremented to 1 (so all cpus have seen cpu_count
+ *   reaching 0) and will then set pending_count to 0 indicating there is no
+ *   rcu_barrier() running.
+ * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
+ * be active if pending_count is not zero. In case rcu_barrier() is called on
+ * multiple cpus it is enough to check for pending_count being not zero on entry
+ * and to call process_pending_softirqs() in a loop until pending_count drops to
+ * zero, before starting the new rcu_barrier() processing.
+ */
+static atomic_t cpu_count = ATOMIC_INIT(0);
+static atomic_t pending_count = ATOMIC_INIT(0);
 
 static void rcu_barrier_callback(struct rcu_head *head)
 {
-    struct rcu_barrier_data *data = container_of(
-        head, struct rcu_barrier_data, head);
-    atomic_inc(data->cpu_count);
+    smp_mb__before_atomic();     /* Make all writes visible to other cpus. */
+    atomic_dec(&cpu_count);
 }
 
-static int rcu_barrier_action(void *_cpu_count)
+static void rcu_barrier_action(void)
 {
-    struct rcu_barrier_data data = { .cpu_count = _cpu_count };
-
-    ASSERT(!local_irq_is_enabled());
-    local_irq_enable();
+    struct rcu_head head;
 
     /*
      * When callback is executed, all previously-queued RCU work on this CPU
-     * is completed. When all CPUs have executed their callback, data.cpu_count
-     * will have been incremented to include every online CPU.
+     * is completed. When all CPUs have executed their callback, cpu_count
+     * will have been decremented to 0.
      */
-    call_rcu(&data.head, rcu_barrier_callback);
+    call_rcu(&head, rcu_barrier_callback);
 
-    while ( atomic_read(data.cpu_count) != num_online_cpus() )
+    while ( atomic_read(&cpu_count) )
     {
         process_pending_softirqs();
         cpu_relax();
     }
 
-    local_irq_disable();
-
-    return 0;
+    smp_mb__before_atomic();
+    atomic_dec(&pending_count);
 }
 
-/*
- * As rcu_barrier() is using stop_machine_run() it is allowed to be used in
- * idle context only (see comment for stop_machine_run()).
- */
-int rcu_barrier(void)
+void rcu_barrier(void)
 {
-    atomic_t cpu_count = ATOMIC_INIT(0);
-    return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
+    unsigned int n_cpus;
+
+    ASSERT(!in_irq() && local_irq_is_enabled());
+
+    for ( ; ; )
+    {
+        if ( !atomic_read(&pending_count) && get_cpu_maps() )
+        {
+            n_cpus = num_online_cpus();
+
+            if ( atomic_cmpxchg(&pending_count, 0, n_cpus + 1) == 0 )
+                break;
+
+            put_cpu_maps();
+        }
+
+        process_pending_softirqs();
+        cpu_relax();
+    }
+
+    smp_mb__before_atomic();
+    atomic_set(&cpu_count, n_cpus);
+    cpumask_raise_softirq(&cpu_online_map, RCU_SOFTIRQ);
+
+    while ( atomic_read(&pending_count) != 1 )
+    {
+        process_pending_softirqs();
+        cpu_relax();
+    }
+
+    atomic_set(&pending_count, 0);
+
+    put_cpu_maps();
 }
 
 /* Is batch a before batch b ? */
@@ -426,6 +465,13 @@  static void rcu_process_callbacks(void)
         rdp->process_callbacks = false;
         __rcu_process_callbacks(&rcu_ctrlblk, rdp);
     }
+
+    if ( atomic_read(&cpu_count) && !rdp->barrier_active )
+    {
+        rdp->barrier_active = true;
+        rcu_barrier_action();
+        rdp->barrier_active = false;
+    }
 }
 
 static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
diff --git a/xen/include/xen/rcupdate.h b/xen/include/xen/rcupdate.h
index eb9b60df07..31c8b86d13 100644
--- a/xen/include/xen/rcupdate.h
+++ b/xen/include/xen/rcupdate.h
@@ -144,7 +144,7 @@  void rcu_check_callbacks(int cpu);
 void call_rcu(struct rcu_head *head, 
               void (*func)(struct rcu_head *head));
 
-int rcu_barrier(void);
+void rcu_barrier(void);
 
 void rcu_idle_enter(unsigned int cpu);
 void rcu_idle_exit(unsigned int cpu);