diff mbox series

[v6,1/4] xen/rcu: don't use stop_machine_run() for rcu_barrier()

Message ID 20200313130614.27265-2-jgross@suse.com (mailing list archive)
State Superseded
Headers show
Series xen/rcu: let rcu work better with core scheduling | expand

Commit Message

Juergen Gross March 13, 2020, 1:06 p.m. UTC
Today rcu_barrier() is calling stop_machine_run() to synchronize all
physical cpus in order to ensure all pending rcu calls have finished
when returning.

As stop_machine_run() is using tasklets this requires scheduling of
idle vcpus on all cpus imposing the need to call rcu_barrier() on idle
cpus only in case of core scheduling being active, as otherwise a
scheduling deadlock would occur.

There is no need at all to do the syncing of the cpus in tasklets, as
rcu activity is started in __do_softirq() called whenever softirq
activity is allowed. So rcu_barrier() can easily be modified to use
softirq for synchronization of the cpus no longer requiring any
scheduling activity.

As there already is a rcu softirq reuse that for the synchronization.

Remove the barrier element from struct rcu_data as it isn't used.

Finally switch rcu_barrier() to return void as it now can never fail.

Partially-based-on-patch-by: Igor Druzhinin <igor.druzhinin@citrix.com>
Signed-off-by: Juergen Gross <jgross@suse.com>
---
V2:
- add recursion detection

V3:
- fix races (Igor Druzhinin)

V5:
- rename done_count to pending_count (Jan Beulich)
- fix race (Jan Beulich)

V6:
- add barrier (Julien Grall)
- add ASSERT() (Julien Grall)
- hold cpu_map lock until end of rcu_barrier() (Julien Grall)
---
 xen/common/rcupdate.c      | 95 +++++++++++++++++++++++++++++++++-------------
 xen/include/xen/rcupdate.h |  2 +-
 2 files changed, 69 insertions(+), 28 deletions(-)

Comments

Igor Druzhinin March 16, 2020, 3:24 p.m. UTC | #1
On 13/03/2020 13:06, Juergen Gross wrote:
> Today rcu_barrier() is calling stop_machine_run() to synchronize all
> physical cpus in order to ensure all pending rcu calls have finished
> when returning.
> 
> As stop_machine_run() is using tasklets this requires scheduling of
> idle vcpus on all cpus imposing the need to call rcu_barrier() on idle
> cpus only in case of core scheduling being active, as otherwise a
> scheduling deadlock would occur.
> 
> There is no need at all to do the syncing of the cpus in tasklets, as
> rcu activity is started in __do_softirq() called whenever softirq
> activity is allowed. So rcu_barrier() can easily be modified to use
> softirq for synchronization of the cpus no longer requiring any
> scheduling activity.
> 
> As there already is a rcu softirq reuse that for the synchronization.
> 
> Remove the barrier element from struct rcu_data as it isn't used.
> 
> Finally switch rcu_barrier() to return void as it now can never fail.
> 
> Partially-based-on-patch-by: Igor Druzhinin <igor.druzhinin@citrix.com>
> Signed-off-by: Juergen Gross <jgross@suse.com>
> ---
> V2:
> - add recursion detection
> 
> V3:
> - fix races (Igor Druzhinin)
> 
> V5:
> - rename done_count to pending_count (Jan Beulich)
> - fix race (Jan Beulich)
> 
> V6:
> - add barrier (Julien Grall)
> - add ASSERT() (Julien Grall)
> - hold cpu_map lock until end of rcu_barrier() (Julien Grall)
> ---
>   xen/common/rcupdate.c      | 95 +++++++++++++++++++++++++++++++++-------------
>   xen/include/xen/rcupdate.h |  2 +-
>   2 files changed, 69 insertions(+), 28 deletions(-)
> 
> diff --git a/xen/common/rcupdate.c b/xen/common/rcupdate.c
> index 03d84764d2..ed9083d2b2 100644
> --- a/xen/common/rcupdate.c
> +++ b/xen/common/rcupdate.c
> @@ -83,7 +83,6 @@ struct rcu_data {
>       struct rcu_head **donetail;
>       long            blimit;           /* Upper limit on a processed batch */
>       int cpu;
> -    struct rcu_head barrier;
>       long            last_rs_qlen;     /* qlen during the last resched */
>   
>       /* 3) idle CPUs handling */
> @@ -91,6 +90,7 @@ struct rcu_data {
>       bool idle_timer_active;
>   
>       bool            process_callbacks;
> +    bool            barrier_active;
>   };
>   
>   /*
> @@ -143,51 +143,85 @@ static int qhimark = 10000;
>   static int qlowmark = 100;
>   static int rsinterval = 1000;
>   
> -struct rcu_barrier_data {
> -    struct rcu_head head;
> -    atomic_t *cpu_count;
> -};
> +/*
> + * rcu_barrier() handling:
> + * cpu_count holds the number of cpus required to finish barrier handling.
> + * pending_count is initialized to nr_cpus + 1.
> + * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
> + * be active if pending_count is not zero. In case rcu_barrier() is called on
> + * multiple cpus it is enough to check for pending_count being not zero on entry
> + * and to call process_pending_softirqs() in a loop until pending_count drops to
> + * zero, before starting the new rcu_barrier() processing.
> + * In order to avoid hangs when rcu_barrier() is called multiple times on the
> + * same cpu in fast sequence and a slave cpu couldn't drop out of the
> + * barrier handling fast enough a second counter pending_count is needed.
> + * The rcu_barrier() invoking cpu will wait until pending_count reaches 1
> + * (meaning that all cpus have finished processing the barrier) and then will
> + * reset pending_count to 0 to enable entering rcu_barrier() again.
> + */
> +static atomic_t cpu_count = ATOMIC_INIT(0);
> +static atomic_t pending_count = ATOMIC_INIT(0);
>   
>   static void rcu_barrier_callback(struct rcu_head *head)
>   {
> -    struct rcu_barrier_data *data = container_of(
> -        head, struct rcu_barrier_data, head);
> -    atomic_inc(data->cpu_count);
> +    smp_wmb();     /* Make all previous writes visible to other cpus. */
> +    atomic_dec(&cpu_count);
>   }
>   
> -static int rcu_barrier_action(void *_cpu_count)
> +static void rcu_barrier_action(void)
>   {
> -    struct rcu_barrier_data data = { .cpu_count = _cpu_count };
> -
> -    ASSERT(!local_irq_is_enabled());
> -    local_irq_enable();
> +    struct rcu_head head;
>   
>       /*
>        * When callback is executed, all previously-queued RCU work on this CPU
> -     * is completed. When all CPUs have executed their callback, data.cpu_count
> -     * will have been incremented to include every online CPU.
> +     * is completed. When all CPUs have executed their callback, cpu_count
> +     * will have been decremented to 0.
>        */
> -    call_rcu(&data.head, rcu_barrier_callback);
> +    call_rcu(&head, rcu_barrier_callback);
>   
> -    while ( atomic_read(data.cpu_count) != num_online_cpus() )
> +    while ( atomic_read(&cpu_count) )
>       {
>           process_pending_softirqs();
>           cpu_relax();
>       }
>   
> -    local_irq_disable();
> -
> -    return 0;
> +    atomic_dec(&pending_count);
>   }
>   
> -/*
> - * As rcu_barrier() is using stop_machine_run() it is allowed to be used in
> - * idle context only (see comment for stop_machine_run()).
> - */
> -int rcu_barrier(void)
> +void rcu_barrier(void)
>   {
> -    atomic_t cpu_count = ATOMIC_INIT(0);
> -    return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
> +    unsigned int n_cpus;
> +
> +    ASSERT(!in_irq() && local_irq_is_enabled());
> +
> +    for ( ;; )
> +    {
> +        if ( !atomic_read(&pending_count) && get_cpu_maps() )
> +        {

If the whole action is happening while cpu_maps are taken why do you 
need to check pending_count first? I think the logic of this loop
could be simplified if taken this into account.

Igor
Juergen Gross March 16, 2020, 4:01 p.m. UTC | #2
On 16.03.20 16:24, Igor Druzhinin wrote:
> On 13/03/2020 13:06, Juergen Gross wrote:
>> Today rcu_barrier() is calling stop_machine_run() to synchronize all
>> physical cpus in order to ensure all pending rcu calls have finished
>> when returning.
>>
>> As stop_machine_run() is using tasklets this requires scheduling of
>> idle vcpus on all cpus imposing the need to call rcu_barrier() on idle
>> cpus only in case of core scheduling being active, as otherwise a
>> scheduling deadlock would occur.
>>
>> There is no need at all to do the syncing of the cpus in tasklets, as
>> rcu activity is started in __do_softirq() called whenever softirq
>> activity is allowed. So rcu_barrier() can easily be modified to use
>> softirq for synchronization of the cpus no longer requiring any
>> scheduling activity.
>>
>> As there already is a rcu softirq reuse that for the synchronization.
>>
>> Remove the barrier element from struct rcu_data as it isn't used.
>>
>> Finally switch rcu_barrier() to return void as it now can never fail.
>>
>> Partially-based-on-patch-by: Igor Druzhinin <igor.druzhinin@citrix.com>
>> Signed-off-by: Juergen Gross <jgross@suse.com>
>> ---
>> V2:
>> - add recursion detection
>>
>> V3:
>> - fix races (Igor Druzhinin)
>>
>> V5:
>> - rename done_count to pending_count (Jan Beulich)
>> - fix race (Jan Beulich)
>>
>> V6:
>> - add barrier (Julien Grall)
>> - add ASSERT() (Julien Grall)
>> - hold cpu_map lock until end of rcu_barrier() (Julien Grall)
>> ---
>>    xen/common/rcupdate.c      | 95 +++++++++++++++++++++++++++++++++-------------
>>    xen/include/xen/rcupdate.h |  2 +-
>>    2 files changed, 69 insertions(+), 28 deletions(-)
>>
>> diff --git a/xen/common/rcupdate.c b/xen/common/rcupdate.c
>> index 03d84764d2..ed9083d2b2 100644
>> --- a/xen/common/rcupdate.c
>> +++ b/xen/common/rcupdate.c
>> @@ -83,7 +83,6 @@ struct rcu_data {
>>        struct rcu_head **donetail;
>>        long            blimit;           /* Upper limit on a processed batch */
>>        int cpu;
>> -    struct rcu_head barrier;
>>        long            last_rs_qlen;     /* qlen during the last resched */
>>    
>>        /* 3) idle CPUs handling */
>> @@ -91,6 +90,7 @@ struct rcu_data {
>>        bool idle_timer_active;
>>    
>>        bool            process_callbacks;
>> +    bool            barrier_active;
>>    };
>>    
>>    /*
>> @@ -143,51 +143,85 @@ static int qhimark = 10000;
>>    static int qlowmark = 100;
>>    static int rsinterval = 1000;
>>    
>> -struct rcu_barrier_data {
>> -    struct rcu_head head;
>> -    atomic_t *cpu_count;
>> -};
>> +/*
>> + * rcu_barrier() handling:
>> + * cpu_count holds the number of cpus required to finish barrier handling.
>> + * pending_count is initialized to nr_cpus + 1.
>> + * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
>> + * be active if pending_count is not zero. In case rcu_barrier() is called on
>> + * multiple cpus it is enough to check for pending_count being not zero on entry
>> + * and to call process_pending_softirqs() in a loop until pending_count drops to
>> + * zero, before starting the new rcu_barrier() processing.
>> + * In order to avoid hangs when rcu_barrier() is called multiple times on the
>> + * same cpu in fast sequence and a slave cpu couldn't drop out of the
>> + * barrier handling fast enough a second counter pending_count is needed.
>> + * The rcu_barrier() invoking cpu will wait until pending_count reaches 1
>> + * (meaning that all cpus have finished processing the barrier) and then will
>> + * reset pending_count to 0 to enable entering rcu_barrier() again.
>> + */
>> +static atomic_t cpu_count = ATOMIC_INIT(0);
>> +static atomic_t pending_count = ATOMIC_INIT(0);
>>    
>>    static void rcu_barrier_callback(struct rcu_head *head)
>>    {
>> -    struct rcu_barrier_data *data = container_of(
>> -        head, struct rcu_barrier_data, head);
>> -    atomic_inc(data->cpu_count);
>> +    smp_wmb();     /* Make all previous writes visible to other cpus. */
>> +    atomic_dec(&cpu_count);
>>    }
>>    
>> -static int rcu_barrier_action(void *_cpu_count)
>> +static void rcu_barrier_action(void)
>>    {
>> -    struct rcu_barrier_data data = { .cpu_count = _cpu_count };
>> -
>> -    ASSERT(!local_irq_is_enabled());
>> -    local_irq_enable();
>> +    struct rcu_head head;
>>    
>>        /*
>>         * When callback is executed, all previously-queued RCU work on this CPU
>> -     * is completed. When all CPUs have executed their callback, data.cpu_count
>> -     * will have been incremented to include every online CPU.
>> +     * is completed. When all CPUs have executed their callback, cpu_count
>> +     * will have been decremented to 0.
>>         */
>> -    call_rcu(&data.head, rcu_barrier_callback);
>> +    call_rcu(&head, rcu_barrier_callback);
>>    
>> -    while ( atomic_read(data.cpu_count) != num_online_cpus() )
>> +    while ( atomic_read(&cpu_count) )
>>        {
>>            process_pending_softirqs();
>>            cpu_relax();
>>        }
>>    
>> -    local_irq_disable();
>> -
>> -    return 0;
>> +    atomic_dec(&pending_count);
>>    }
>>    
>> -/*
>> - * As rcu_barrier() is using stop_machine_run() it is allowed to be used in
>> - * idle context only (see comment for stop_machine_run()).
>> - */
>> -int rcu_barrier(void)
>> +void rcu_barrier(void)
>>    {
>> -    atomic_t cpu_count = ATOMIC_INIT(0);
>> -    return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
>> +    unsigned int n_cpus;
>> +
>> +    ASSERT(!in_irq() && local_irq_is_enabled());
>> +
>> +    for ( ;; )
>> +    {
>> +        if ( !atomic_read(&pending_count) && get_cpu_maps() )
>> +        {
> 
> If the whole action is happening while cpu_maps are taken why do you
> need to check pending_count first? I think the logic of this loop
> could be simplified if taken this into account.

get_cpu_maps() can be successful on multiple cpus (its a read_lock()).
Testing pending_count avoids hammering on the cache lines.


Juergen
Igor Druzhinin March 16, 2020, 4:21 p.m. UTC | #3
On 16/03/2020 16:01, Jürgen Groß wrote:
> On 16.03.20 16:24, Igor Druzhinin wrote:
>> On 13/03/2020 13:06, Juergen Gross wrote:
>>> Today rcu_barrier() is calling stop_machine_run() to synchronize all
>>> physical cpus in order to ensure all pending rcu calls have finished
>>> when returning.
>>>
>>> As stop_machine_run() is using tasklets this requires scheduling of
>>> idle vcpus on all cpus imposing the need to call rcu_barrier() on idle
>>> cpus only in case of core scheduling being active, as otherwise a
>>> scheduling deadlock would occur.
>>>
>>> There is no need at all to do the syncing of the cpus in tasklets, as
>>> rcu activity is started in __do_softirq() called whenever softirq
>>> activity is allowed. So rcu_barrier() can easily be modified to use
>>> softirq for synchronization of the cpus no longer requiring any
>>> scheduling activity.
>>>
>>> As there already is a rcu softirq reuse that for the synchronization.
>>>
>>> Remove the barrier element from struct rcu_data as it isn't used.
>>>
>>> Finally switch rcu_barrier() to return void as it now can never fail.
>>>
>>> Partially-based-on-patch-by: Igor Druzhinin <igor.druzhinin@citrix.com>
>>> Signed-off-by: Juergen Gross <jgross@suse.com>
>>> ---
>>> V2:
>>> - add recursion detection
>>>
>>> V3:
>>> - fix races (Igor Druzhinin)
>>>
>>> V5:
>>> - rename done_count to pending_count (Jan Beulich)
>>> - fix race (Jan Beulich)
>>>
>>> V6:
>>> - add barrier (Julien Grall)
>>> - add ASSERT() (Julien Grall)
>>> - hold cpu_map lock until end of rcu_barrier() (Julien Grall)
>>> ---
>>>    xen/common/rcupdate.c      | 95 
>>> +++++++++++++++++++++++++++++++++-------------
>>>    xen/include/xen/rcupdate.h |  2 +-
>>>    2 files changed, 69 insertions(+), 28 deletions(-)
>>>
>>> diff --git a/xen/common/rcupdate.c b/xen/common/rcupdate.c
>>> index 03d84764d2..ed9083d2b2 100644
>>> --- a/xen/common/rcupdate.c
>>> +++ b/xen/common/rcupdate.c
>>> @@ -83,7 +83,6 @@ struct rcu_data {
>>>        struct rcu_head **donetail;
>>>        long            blimit;           /* Upper limit on a 
>>> processed batch */
>>>        int cpu;
>>> -    struct rcu_head barrier;
>>>        long            last_rs_qlen;     /* qlen during the last 
>>> resched */
>>>        /* 3) idle CPUs handling */
>>> @@ -91,6 +90,7 @@ struct rcu_data {
>>>        bool idle_timer_active;
>>>        bool            process_callbacks;
>>> +    bool            barrier_active;
>>>    };
>>>    /*
>>> @@ -143,51 +143,85 @@ static int qhimark = 10000;
>>>    static int qlowmark = 100;
>>>    static int rsinterval = 1000;
>>> -struct rcu_barrier_data {
>>> -    struct rcu_head head;
>>> -    atomic_t *cpu_count;
>>> -};
>>> +/*
>>> + * rcu_barrier() handling:
>>> + * cpu_count holds the number of cpus required to finish barrier 
>>> handling.
>>> + * pending_count is initialized to nr_cpus + 1.
>>> + * Cpus are synchronized via softirq mechanism. rcu_barrier() is 
>>> regarded to
>>> + * be active if pending_count is not zero. In case rcu_barrier() is 
>>> called on
>>> + * multiple cpus it is enough to check for pending_count being not 
>>> zero on entry
>>> + * and to call process_pending_softirqs() in a loop until 
>>> pending_count drops to
>>> + * zero, before starting the new rcu_barrier() processing.
>>> + * In order to avoid hangs when rcu_barrier() is called multiple 
>>> times on the
>>> + * same cpu in fast sequence and a slave cpu couldn't drop out of the
>>> + * barrier handling fast enough a second counter pending_count is 
>>> needed.
>>> + * The rcu_barrier() invoking cpu will wait until pending_count 
>>> reaches 1
>>> + * (meaning that all cpus have finished processing the barrier) and 
>>> then will
>>> + * reset pending_count to 0 to enable entering rcu_barrier() again.
>>> + */
>>> +static atomic_t cpu_count = ATOMIC_INIT(0);
>>> +static atomic_t pending_count = ATOMIC_INIT(0);
>>>    static void rcu_barrier_callback(struct rcu_head *head)
>>>    {
>>> -    struct rcu_barrier_data *data = container_of(
>>> -        head, struct rcu_barrier_data, head);
>>> -    atomic_inc(data->cpu_count);
>>> +    smp_wmb();     /* Make all previous writes visible to other 
>>> cpus. */
>>> +    atomic_dec(&cpu_count);
>>>    }
>>> -static int rcu_barrier_action(void *_cpu_count)
>>> +static void rcu_barrier_action(void)
>>>    {
>>> -    struct rcu_barrier_data data = { .cpu_count = _cpu_count };
>>> -
>>> -    ASSERT(!local_irq_is_enabled());
>>> -    local_irq_enable();
>>> +    struct rcu_head head;
>>>        /*
>>>         * When callback is executed, all previously-queued RCU work 
>>> on this CPU
>>> -     * is completed. When all CPUs have executed their callback, 
>>> data.cpu_count
>>> -     * will have been incremented to include every online CPU.
>>> +     * is completed. When all CPUs have executed their callback, 
>>> cpu_count
>>> +     * will have been decremented to 0.
>>>         */
>>> -    call_rcu(&data.head, rcu_barrier_callback);
>>> +    call_rcu(&head, rcu_barrier_callback);
>>> -    while ( atomic_read(data.cpu_count) != num_online_cpus() )
>>> +    while ( atomic_read(&cpu_count) )
>>>        {
>>>            process_pending_softirqs();
>>>            cpu_relax();
>>>        }
>>> -    local_irq_disable();
>>> -
>>> -    return 0;
>>> +    atomic_dec(&pending_count);
>>>    }
>>> -/*
>>> - * As rcu_barrier() is using stop_machine_run() it is allowed to be 
>>> used in
>>> - * idle context only (see comment for stop_machine_run()).
>>> - */
>>> -int rcu_barrier(void)
>>> +void rcu_barrier(void)
>>>    {
>>> -    atomic_t cpu_count = ATOMIC_INIT(0);
>>> -    return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
>>> +    unsigned int n_cpus;
>>> +
>>> +    ASSERT(!in_irq() && local_irq_is_enabled());
>>> +
>>> +    for ( ;; )
>>> +    {
>>> +        if ( !atomic_read(&pending_count) && get_cpu_maps() )
>>> +        {
>>
>> If the whole action is happening while cpu_maps are taken why do you
>> need to check pending_count first? I think the logic of this loop
>> could be simplified if taken this into account.
> 
> get_cpu_maps() can be successful on multiple cpus (its a read_lock()).
> Testing pending_count avoids hammering on the cache lines.

I see - the logic was changed recently. I'm currently testing this 
version of the patch.

Igor
Jan Beulich March 17, 2020, 1:56 p.m. UTC | #4
On 13.03.2020 14:06, Juergen Gross wrote:
> @@ -143,51 +143,85 @@ static int qhimark = 10000;
>  static int qlowmark = 100;
>  static int rsinterval = 1000;
>  
> -struct rcu_barrier_data {
> -    struct rcu_head head;
> -    atomic_t *cpu_count;
> -};
> +/*
> + * rcu_barrier() handling:
> + * cpu_count holds the number of cpus required to finish barrier handling.
> + * pending_count is initialized to nr_cpus + 1.
> + * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
> + * be active if pending_count is not zero. In case rcu_barrier() is called on
> + * multiple cpus it is enough to check for pending_count being not zero on entry
> + * and to call process_pending_softirqs() in a loop until pending_count drops to
> + * zero, before starting the new rcu_barrier() processing.

Everything up to here reads fine, but ...

> + * In order to avoid hangs when rcu_barrier() is called multiple times on the
> + * same cpu in fast sequence and a slave cpu couldn't drop out of the
> + * barrier handling fast enough a second counter pending_count is needed.
> + * The rcu_barrier() invoking cpu will wait until pending_count reaches 1
> + * (meaning that all cpus have finished processing the barrier) and then will
> + * reset pending_count to 0 to enable entering rcu_barrier() again.

... this starts as if pending_count wasn't mentioned before at all,
which might end up being confusing (e.g. suspecting the text having
gone out of sync with the code, as has happened to me).

> + */
> +static atomic_t cpu_count = ATOMIC_INIT(0);
> +static atomic_t pending_count = ATOMIC_INIT(0);
>  
>  static void rcu_barrier_callback(struct rcu_head *head)
>  {
> -    struct rcu_barrier_data *data = container_of(
> -        head, struct rcu_barrier_data, head);
> -    atomic_inc(data->cpu_count);
> +    smp_wmb();     /* Make all previous writes visible to other cpus. */
> +    atomic_dec(&cpu_count);

In Linux terms, wouldn't this be smp_mb__before_atomic()? If so,
perhaps better if we also introduce this and its "after" sibling.

>  }
>  
> -static int rcu_barrier_action(void *_cpu_count)
> +static void rcu_barrier_action(void)
>  {
> -    struct rcu_barrier_data data = { .cpu_count = _cpu_count };
> -
> -    ASSERT(!local_irq_is_enabled());
> -    local_irq_enable();
> +    struct rcu_head head;
>  
>      /*
>       * When callback is executed, all previously-queued RCU work on this CPU
> -     * is completed. When all CPUs have executed their callback, data.cpu_count
> -     * will have been incremented to include every online CPU.
> +     * is completed. When all CPUs have executed their callback, cpu_count
> +     * will have been decremented to 0.
>       */
> -    call_rcu(&data.head, rcu_barrier_callback);
> +    call_rcu(&head, rcu_barrier_callback);
>  
> -    while ( atomic_read(data.cpu_count) != num_online_cpus() )
> +    while ( atomic_read(&cpu_count) )
>      {
>          process_pending_softirqs();
>          cpu_relax();
>      }
>  
> -    local_irq_disable();
> -
> -    return 0;
> +    atomic_dec(&pending_count);

Isn't there a barrier needed between the atomic_read() and this
atomic_dec()?

> +void rcu_barrier(void)
>  {
> -    atomic_t cpu_count = ATOMIC_INIT(0);
> -    return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
> +    unsigned int n_cpus;
> +
> +    ASSERT(!in_irq() && local_irq_is_enabled());
> +
> +    for ( ;; )

Nit: Canonically there ought to also be a blank between the two
semicolons.

> +    {
> +        if ( !atomic_read(&pending_count) && get_cpu_maps() )
> +        {
> +            n_cpus = num_online_cpus();
> +
> +            if ( atomic_cmpxchg(&pending_count, 0, n_cpus + 1) == 0 )
> +                break;
> +
> +            put_cpu_maps();
> +        }
> +
> +        process_pending_softirqs();
> +        cpu_relax();

Is this really needed after having invoked
process_pending_softirqs()?

> +    }
> +
> +    atomic_set(&cpu_count, n_cpus);

Isn't there a barrier needed ahead of this, to order it wrt the
cmpxchg?

> +    cpumask_raise_softirq(&cpu_online_map, RCU_SOFTIRQ);

Isn't there another barrier needed ahead of this, to order it wrt
the set?

Jan
Juergen Gross March 19, 2020, 12:06 p.m. UTC | #5
On 17.03.20 14:56, Jan Beulich wrote:
> On 13.03.2020 14:06, Juergen Gross wrote:
>> @@ -143,51 +143,85 @@ static int qhimark = 10000;
>>   static int qlowmark = 100;
>>   static int rsinterval = 1000;
>>   
>> -struct rcu_barrier_data {
>> -    struct rcu_head head;
>> -    atomic_t *cpu_count;
>> -};
>> +/*
>> + * rcu_barrier() handling:
>> + * cpu_count holds the number of cpus required to finish barrier handling.
>> + * pending_count is initialized to nr_cpus + 1.
>> + * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
>> + * be active if pending_count is not zero. In case rcu_barrier() is called on
>> + * multiple cpus it is enough to check for pending_count being not zero on entry
>> + * and to call process_pending_softirqs() in a loop until pending_count drops to
>> + * zero, before starting the new rcu_barrier() processing.
> 
> Everything up to here reads fine, but ...
> 
>> + * In order to avoid hangs when rcu_barrier() is called multiple times on the
>> + * same cpu in fast sequence and a slave cpu couldn't drop out of the
>> + * barrier handling fast enough a second counter pending_count is needed.
>> + * The rcu_barrier() invoking cpu will wait until pending_count reaches 1
>> + * (meaning that all cpus have finished processing the barrier) and then will
>> + * reset pending_count to 0 to enable entering rcu_barrier() again.
> 
> ... this starts as if pending_count wasn't mentioned before at all,
> which might end up being confusing (e.g. suspecting the text having
> gone out of sync with the code, as has happened to me).

I'll reword the comment.

> 
>> + */
>> +static atomic_t cpu_count = ATOMIC_INIT(0);
>> +static atomic_t pending_count = ATOMIC_INIT(0);
>>   
>>   static void rcu_barrier_callback(struct rcu_head *head)
>>   {
>> -    struct rcu_barrier_data *data = container_of(
>> -        head, struct rcu_barrier_data, head);
>> -    atomic_inc(data->cpu_count);
>> +    smp_wmb();     /* Make all previous writes visible to other cpus. */
>> +    atomic_dec(&cpu_count);
> 
> In Linux terms, wouldn't this be smp_mb__before_atomic()? If so,
> perhaps better if we also introduce this and its "after" sibling.

Okay, will add a patch.

> 
>>   }
>>   
>> -static int rcu_barrier_action(void *_cpu_count)
>> +static void rcu_barrier_action(void)
>>   {
>> -    struct rcu_barrier_data data = { .cpu_count = _cpu_count };
>> -
>> -    ASSERT(!local_irq_is_enabled());
>> -    local_irq_enable();
>> +    struct rcu_head head;
>>   
>>       /*
>>        * When callback is executed, all previously-queued RCU work on this CPU
>> -     * is completed. When all CPUs have executed their callback, data.cpu_count
>> -     * will have been incremented to include every online CPU.
>> +     * is completed. When all CPUs have executed their callback, cpu_count
>> +     * will have been decremented to 0.
>>        */
>> -    call_rcu(&data.head, rcu_barrier_callback);
>> +    call_rcu(&head, rcu_barrier_callback);
>>   
>> -    while ( atomic_read(data.cpu_count) != num_online_cpus() )
>> +    while ( atomic_read(&cpu_count) )
>>       {
>>           process_pending_softirqs();
>>           cpu_relax();
>>       }
>>   
>> -    local_irq_disable();
>> -
>> -    return 0;
>> +    atomic_dec(&pending_count);
> 
> Isn't there a barrier needed between the atomic_read() and this
> atomic_dec()?

Yes, probably.

> 
>> +void rcu_barrier(void)
>>   {
>> -    atomic_t cpu_count = ATOMIC_INIT(0);
>> -    return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
>> +    unsigned int n_cpus;
>> +
>> +    ASSERT(!in_irq() && local_irq_is_enabled());
>> +
>> +    for ( ;; )
> 
> Nit: Canonically there ought to also be a blank between the two
> semicolons.

Okay.

> 
>> +    {
>> +        if ( !atomic_read(&pending_count) && get_cpu_maps() )
>> +        {
>> +            n_cpus = num_online_cpus();
>> +
>> +            if ( atomic_cmpxchg(&pending_count, 0, n_cpus + 1) == 0 )
>> +                break;
>> +
>> +            put_cpu_maps();
>> +        }
>> +
>> +        process_pending_softirqs();
>> +        cpu_relax();
> 
> Is this really needed after having invoked
> process_pending_softirqs()?

With no softirq pending this loop might be rather tight. Better to give
a potential other sibling a chance to make progress.

> 
>> +    }
>> +
>> +    atomic_set(&cpu_count, n_cpus);
> 
> Isn't there a barrier needed ahead of this, to order it wrt the
> cmpxchg?

I'll add one.

> 
>> +    cpumask_raise_softirq(&cpu_online_map, RCU_SOFTIRQ);
> 
> Isn't there another barrier needed ahead of this, to order it wrt
> the set?

No, I don't think so. cpumask_raise_softirq() needs to have appropriate
ordering semantics as otherwise the softirq pending bit wouldn't be
guaranteed to be seen by softirq processing.


Juergen
Jan Beulich March 19, 2020, 1:59 p.m. UTC | #6
On 19.03.2020 13:06, Jürgen Groß wrote:
> On 17.03.20 14:56, Jan Beulich wrote:
>> On 13.03.2020 14:06, Juergen Gross wrote:
>>> +    cpumask_raise_softirq(&cpu_online_map, RCU_SOFTIRQ);
>>
>> Isn't there another barrier needed ahead of this, to order it wrt
>> the set?
> 
> No, I don't think so. cpumask_raise_softirq() needs to have appropriate
> ordering semantics as otherwise the softirq pending bit wouldn't be
> guaranteed to be seen by softirq processing.

You may have a point here, but I had given my comment after
looking at cpumask_raise_softirq() and not finding any such
barrier there. Oh, actually - set_bit() and test_and_set_bit()
differ in their barrier characteristics; I wasn't aware of
this.

Jan
diff mbox series

Patch

diff --git a/xen/common/rcupdate.c b/xen/common/rcupdate.c
index 03d84764d2..ed9083d2b2 100644
--- a/xen/common/rcupdate.c
+++ b/xen/common/rcupdate.c
@@ -83,7 +83,6 @@  struct rcu_data {
     struct rcu_head **donetail;
     long            blimit;           /* Upper limit on a processed batch */
     int cpu;
-    struct rcu_head barrier;
     long            last_rs_qlen;     /* qlen during the last resched */
 
     /* 3) idle CPUs handling */
@@ -91,6 +90,7 @@  struct rcu_data {
     bool idle_timer_active;
 
     bool            process_callbacks;
+    bool            barrier_active;
 };
 
 /*
@@ -143,51 +143,85 @@  static int qhimark = 10000;
 static int qlowmark = 100;
 static int rsinterval = 1000;
 
-struct rcu_barrier_data {
-    struct rcu_head head;
-    atomic_t *cpu_count;
-};
+/*
+ * rcu_barrier() handling:
+ * cpu_count holds the number of cpus required to finish barrier handling.
+ * pending_count is initialized to nr_cpus + 1.
+ * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
+ * be active if pending_count is not zero. In case rcu_barrier() is called on
+ * multiple cpus it is enough to check for pending_count being not zero on entry
+ * and to call process_pending_softirqs() in a loop until pending_count drops to
+ * zero, before starting the new rcu_barrier() processing.
+ * In order to avoid hangs when rcu_barrier() is called multiple times on the
+ * same cpu in fast sequence and a slave cpu couldn't drop out of the
+ * barrier handling fast enough a second counter pending_count is needed.
+ * The rcu_barrier() invoking cpu will wait until pending_count reaches 1
+ * (meaning that all cpus have finished processing the barrier) and then will
+ * reset pending_count to 0 to enable entering rcu_barrier() again.
+ */
+static atomic_t cpu_count = ATOMIC_INIT(0);
+static atomic_t pending_count = ATOMIC_INIT(0);
 
 static void rcu_barrier_callback(struct rcu_head *head)
 {
-    struct rcu_barrier_data *data = container_of(
-        head, struct rcu_barrier_data, head);
-    atomic_inc(data->cpu_count);
+    smp_wmb();     /* Make all previous writes visible to other cpus. */
+    atomic_dec(&cpu_count);
 }
 
-static int rcu_barrier_action(void *_cpu_count)
+static void rcu_barrier_action(void)
 {
-    struct rcu_barrier_data data = { .cpu_count = _cpu_count };
-
-    ASSERT(!local_irq_is_enabled());
-    local_irq_enable();
+    struct rcu_head head;
 
     /*
      * When callback is executed, all previously-queued RCU work on this CPU
-     * is completed. When all CPUs have executed their callback, data.cpu_count
-     * will have been incremented to include every online CPU.
+     * is completed. When all CPUs have executed their callback, cpu_count
+     * will have been decremented to 0.
      */
-    call_rcu(&data.head, rcu_barrier_callback);
+    call_rcu(&head, rcu_barrier_callback);
 
-    while ( atomic_read(data.cpu_count) != num_online_cpus() )
+    while ( atomic_read(&cpu_count) )
     {
         process_pending_softirqs();
         cpu_relax();
     }
 
-    local_irq_disable();
-
-    return 0;
+    atomic_dec(&pending_count);
 }
 
-/*
- * As rcu_barrier() is using stop_machine_run() it is allowed to be used in
- * idle context only (see comment for stop_machine_run()).
- */
-int rcu_barrier(void)
+void rcu_barrier(void)
 {
-    atomic_t cpu_count = ATOMIC_INIT(0);
-    return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
+    unsigned int n_cpus;
+
+    ASSERT(!in_irq() && local_irq_is_enabled());
+
+    for ( ;; )
+    {
+        if ( !atomic_read(&pending_count) && get_cpu_maps() )
+        {
+            n_cpus = num_online_cpus();
+
+            if ( atomic_cmpxchg(&pending_count, 0, n_cpus + 1) == 0 )
+                break;
+
+            put_cpu_maps();
+        }
+
+        process_pending_softirqs();
+        cpu_relax();
+    }
+
+    atomic_set(&cpu_count, n_cpus);
+    cpumask_raise_softirq(&cpu_online_map, RCU_SOFTIRQ);
+
+    while ( atomic_read(&pending_count) != 1 )
+    {
+        process_pending_softirqs();
+        cpu_relax();
+    }
+
+    atomic_set(&pending_count, 0);
+
+    put_cpu_maps();
 }
 
 /* Is batch a before batch b ? */
@@ -426,6 +460,13 @@  static void rcu_process_callbacks(void)
         rdp->process_callbacks = false;
         __rcu_process_callbacks(&rcu_ctrlblk, rdp);
     }
+
+    if ( atomic_read(&cpu_count) && !rdp->barrier_active )
+    {
+        rdp->barrier_active = true;
+        rcu_barrier_action();
+        rdp->barrier_active = false;
+    }
 }
 
 static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
diff --git a/xen/include/xen/rcupdate.h b/xen/include/xen/rcupdate.h
index eb9b60df07..31c8b86d13 100644
--- a/xen/include/xen/rcupdate.h
+++ b/xen/include/xen/rcupdate.h
@@ -144,7 +144,7 @@  void rcu_check_callbacks(int cpu);
 void call_rcu(struct rcu_head *head, 
               void (*func)(struct rcu_head *head));
 
-int rcu_barrier(void);
+void rcu_barrier(void);
 
 void rcu_idle_enter(unsigned int cpu);
 void rcu_idle_exit(unsigned int cpu);