diff mbox series

[v2,2/4] xen/rcu: don't use stop_machine_run() for rcu_barrier()

Message ID 20200218122114.17596-3-jgross@suse.com (mailing list archive)
State Superseded
Headers show
Series xen/rcu: let rcu work better with core scheduling | expand

Commit Message

Jürgen Groß Feb. 18, 2020, 12:21 p.m. UTC
Today rcu_barrier() is calling stop_machine_run() to synchronize all
physical cpus in order to ensure all pending rcu calls have finished
when returning.

As stop_machine_run() is using tasklets this requires scheduling of
idle vcpus on all cpus imposing the need to call rcu_barrier() on idle
cpus only in case of core scheduling being active, as otherwise a
scheduling deadlock would occur.

There is no need at all to do the syncing of the cpus in tasklets, as
rcu activity is started in __do_softirq() called whenever softirq
activity is allowed. So rcu_barrier() can easily be modified to use
softirq for synchronization of the cpus no longer requiring any
scheduling activity.

As there already is a rcu softirq reuse that for the synchronization.

Remove the barrier element from struct rcu_data as it isn't used.

Finally switch rcu_barrier() to return void as it now can never fail.

Signed-off-by: Juergen Gross <jgross@suse.com>
---
V2:
- use get_cpu_maps()
- add recursion detection
---
 xen/common/rcupdate.c      | 72 ++++++++++++++++++++++++++++++++--------------
 xen/include/xen/rcupdate.h |  2 +-
 2 files changed, 51 insertions(+), 23 deletions(-)
diff mbox series

Patch

diff --git a/xen/common/rcupdate.c b/xen/common/rcupdate.c
index 079ea9d8a1..e6add0b120 100644
--- a/xen/common/rcupdate.c
+++ b/xen/common/rcupdate.c
@@ -83,7 +83,6 @@  struct rcu_data {
     struct rcu_head **donetail;
     long            blimit;           /* Upper limit on a processed batch */
     int cpu;
-    struct rcu_head barrier;
     long            last_rs_qlen;     /* qlen during the last resched */
 
     /* 3) idle CPUs handling */
@@ -91,6 +90,7 @@  struct rcu_data {
     bool idle_timer_active;
 
     bool            process_callbacks;
+    bool            barrier_active;
 };
 
 /*
@@ -143,47 +143,68 @@  static int qhimark = 10000;
 static int qlowmark = 100;
 static int rsinterval = 1000;
 
-struct rcu_barrier_data {
-    struct rcu_head head;
-    atomic_t *cpu_count;
-};
+/*
+ * rcu_barrier() handling:
+ * cpu_count holds the number of cpu required to finish barrier handling.
+ * Cpus are synchronized via softirq mechanism. rcu_barrier() is regarded to
+ * be active if cpu_count is not zero. In case rcu_barrier() is called on
+ * multiple cpus it is enough to check for cpu_count being not zero on entry
+ * and to call process_pending_softirqs() in a loop until cpu_count drops to
+ * zero, as syncing has been requested already and we don't need to sync
+ * multiple times.
+ */
+static atomic_t cpu_count = ATOMIC_INIT(0);
 
 static void rcu_barrier_callback(struct rcu_head *head)
 {
-    struct rcu_barrier_data *data = container_of(
-        head, struct rcu_barrier_data, head);
-    atomic_inc(data->cpu_count);
+    atomic_dec(&cpu_count);
 }
 
-static int rcu_barrier_action(void *_cpu_count)
+static void rcu_barrier_action(void)
 {
-    struct rcu_barrier_data data = { .cpu_count = _cpu_count };
-
-    ASSERT(!local_irq_is_enabled());
-    local_irq_enable();
+    struct rcu_head head;
 
     /*
      * When callback is executed, all previously-queued RCU work on this CPU
      * is completed. When all CPUs have executed their callback, data.cpu_count
      * will have been incremented to include every online CPU.
      */
-    call_rcu(&data.head, rcu_barrier_callback);
+    call_rcu(&head, rcu_barrier_callback);
 
-    while ( atomic_read(data.cpu_count) != num_online_cpus() )
+    while ( atomic_read(&cpu_count) )
     {
         process_pending_softirqs();
         cpu_relax();
     }
-
-    local_irq_disable();
-
-    return 0;
 }
 
-int rcu_barrier(void)
+void rcu_barrier(void)
 {
-    atomic_t cpu_count = ATOMIC_INIT(0);
-    return stop_machine_run(rcu_barrier_action, &cpu_count, NR_CPUS);
+    int initial = atomic_read(&cpu_count);
+
+    while ( !get_cpu_maps() )
+    {
+        process_pending_softirqs();
+        if ( initial && !atomic_read(&cpu_count) )
+            return;
+
+        cpu_relax();
+        initial = atomic_read(&cpu_count);
+    }
+
+    if ( !initial )
+    {
+        atomic_set(&cpu_count, num_online_cpus());
+        cpumask_raise_softirq(&cpu_online_map, RCU_SOFTIRQ);
+    }
+
+    while ( atomic_read(&cpu_count) )
+    {
+        process_pending_softirqs();
+        cpu_relax();
+    }
+
+    put_cpu_maps();
 }
 
 /* Is batch a before batch b ? */
@@ -422,6 +443,13 @@  static void rcu_process_callbacks(void)
         rdp->process_callbacks = false;
         __rcu_process_callbacks(&rcu_ctrlblk, rdp);
     }
+
+    if ( atomic_read(&cpu_count) && !rdp->barrier_active )
+    {
+        rdp->barrier_active = true;
+        rcu_barrier_action();
+        rdp->barrier_active = false;
+    }
 }
 
 static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
diff --git a/xen/include/xen/rcupdate.h b/xen/include/xen/rcupdate.h
index 174d058113..87f35b7704 100644
--- a/xen/include/xen/rcupdate.h
+++ b/xen/include/xen/rcupdate.h
@@ -143,7 +143,7 @@  void rcu_check_callbacks(int cpu);
 void call_rcu(struct rcu_head *head, 
               void (*func)(struct rcu_head *head));
 
-int rcu_barrier(void);
+void rcu_barrier(void);
 
 void rcu_idle_enter(unsigned int cpu);
 void rcu_idle_exit(unsigned int cpu);