[v3,2/2] rwlock: allow recursive read locking when already locked in write mode
diff mbox series

Message ID 20200224084400.94482-3-roger.pau@citrix.com
State New
Headers show
Series
  • rwlock: allow recursive read locking when already locked in write mode
Related show

Commit Message

Roger Pau Monné Feb. 24, 2020, 8:44 a.m. UTC
Allow a CPU already holding the lock in write mode to also lock it in
read mode. There's no harm in allowing read locking a rwlock that's
already owned by the caller (ie: CPU) in write mode. Allowing such
accesses is required at least for the CPU maps use-case.

In order to do this reserve 12bits of the lock, this allows to support
up to 4096 CPUs. Also reduce the write lock mask to 2 bits: one to
signal there are pending writers waiting on the lock and the other to
signal the lock is owned in write mode.

This reduces the maximum number of concurrent readers from 16777216 to
262144, I think this should still be enough, or else the lock field
can be expanded from 32 to 64bits if all architectures support atomic
operations on 64bit integers.

Fixes: 5872c83b42c608 ('smp: convert the cpu maps lock into a rw lock')
Reported-by: Jan Beulich <jbeulich@suse.com>
Reported-by: Jürgen Groß <jgross@suse.com>
Signed-off-by: Roger Pau Monné <roger.pau@citrix.com>
---
Changes since v2:
 - Use atomic_and to clear the writers part of the lock when
   write-unlocking.
 - Reduce writer-related area to 14bits.
 - Include xen/smp.h for Arm64.

Changes since v1:
 - Move the processor ID field to start at bit 0 and hence avoid the
   shift when checking it.
 - Enlarge write related structures to use 16bit, and hence allow them
   to be cleared using an atomic write.
---
 xen/common/rwlock.c      |  4 ++--
 xen/include/xen/rwlock.h | 52 ++++++++++++++++++++++++----------------
 2 files changed, 34 insertions(+), 22 deletions(-)

Comments

Julien Grall Feb. 24, 2020, 10:05 a.m. UTC | #1
Hi Roger,

On 24/02/2020 08:44, Roger Pau Monne wrote:
> Allow a CPU already holding the lock in write mode to also lock it in
> read mode. There's no harm in allowing read locking a rwlock that's
> already owned by the caller (ie: CPU) in write mode. Allowing such
> accesses is required at least for the CPU maps use-case.
> 
> In order to do this reserve 12bits of the lock, this allows to support
> up to 4096 CPUs. Also reduce the write lock mask to 2 bits: one to
> signal there are pending writers waiting on the lock and the other to
> signal the lock is owned in write mode.
> 
> This reduces the maximum number of concurrent readers from 16777216 to
> 262144, I think this should still be enough, or else the lock field
> can be expanded from 32 to 64bits if all architectures support atomic
> operations on 64bit integers.
> 
> Fixes: 5872c83b42c608 ('smp: convert the cpu maps lock into a rw lock')
> Reported-by: Jan Beulich <jbeulich@suse.com>
> Reported-by: Jürgen Groß <jgross@suse.com>
> Signed-off-by: Roger Pau Monné <roger.pau@citrix.com>
> ---
> Changes since v2:
>   - Use atomic_and to clear the writers part of the lock when
>     write-unlocking.
>   - Reduce writer-related area to 14bits.
>   - Include xen/smp.h for Arm64.

OOI, is it to use smp_processor_id()?

Cheers,
Roger Pau Monné Feb. 24, 2020, 10:10 a.m. UTC | #2
On Mon, Feb 24, 2020 at 10:05:39AM +0000, Julien Grall wrote:
> Hi Roger,
> 
> On 24/02/2020 08:44, Roger Pau Monne wrote:
> > Allow a CPU already holding the lock in write mode to also lock it in
> > read mode. There's no harm in allowing read locking a rwlock that's
> > already owned by the caller (ie: CPU) in write mode. Allowing such
> > accesses is required at least for the CPU maps use-case.
> > 
> > In order to do this reserve 12bits of the lock, this allows to support
> > up to 4096 CPUs. Also reduce the write lock mask to 2 bits: one to
> > signal there are pending writers waiting on the lock and the other to
> > signal the lock is owned in write mode.
> > 
> > This reduces the maximum number of concurrent readers from 16777216 to
> > 262144, I think this should still be enough, or else the lock field
> > can be expanded from 32 to 64bits if all architectures support atomic
> > operations on 64bit integers.
> > 
> > Fixes: 5872c83b42c608 ('smp: convert the cpu maps lock into a rw lock')
> > Reported-by: Jan Beulich <jbeulich@suse.com>
> > Reported-by: Jürgen Groß <jgross@suse.com>
> > Signed-off-by: Roger Pau Monné <roger.pau@citrix.com>
> > ---
> > Changes since v2:
> >   - Use atomic_and to clear the writers part of the lock when
> >     write-unlocking.
> >   - Reduce writer-related area to 14bits.
> >   - Include xen/smp.h for Arm64.
> 
> OOI, is it to use smp_processor_id()?

Yes, or else I would get errors when building asm-offsets on Arm64
IIRC.

Thanks, Roger.
Jan Beulich Feb. 25, 2020, 3:23 p.m. UTC | #3
On 24.02.2020 09:44, Roger Pau Monne wrote:
> @@ -20,21 +21,30 @@ typedef struct {
>  #define DEFINE_RWLOCK(l) rwlock_t l = RW_LOCK_UNLOCKED
>  #define rwlock_init(l) (*(l) = (rwlock_t)RW_LOCK_UNLOCKED)
>  
> -/*
> - * Writer states & reader shift and bias.
> - *
> - * Writer field is 8 bit to allow for potential optimisation, see
> - * _write_unlock().
> - */
> -#define    _QW_WAITING  1               /* A writer is waiting     */
> -#define    _QW_LOCKED   0xff            /* A writer holds the lock */
> -#define    _QW_WMASK    0xff            /* Writer mask.*/
> -#define    _QR_SHIFT    8               /* Reader count shift      */
> +/* Writer states & reader shift and bias. */
> +#define    _QW_CPUMASK  0xfff               /* Writer CPU mask */
> +#define    _QW_SHIFT    12                  /* Writer flags shift */
> +#define    _QW_WAITING  (1u << _QW_SHIFT)   /* A writer is waiting */
> +#define    _QW_LOCKED   (3u << _QW_SHIFT)   /* A writer holds the lock */
> +#define    _QW_WMASK    (3u << _QW_SHIFT)   /* Writer mask */
> +#define    _QR_SHIFT    14                  /* Reader count shift */

In particular with the suggested change of atomic_and()'s first
parameter's type, perhaps the u suffixes want dropping here?

> +static inline bool _is_write_locked_by_me(uint32_t cnts)

Both here and ...

> +{
> +    BUILD_BUG_ON(_QW_CPUMASK < NR_CPUS);
> +    return (cnts & _QW_WMASK) == _QW_LOCKED &&
> +           (cnts & _QW_CPUMASK) == smp_processor_id();
> +}
> +
> +static inline bool _can_read_lock(uint32_t cnts)

... here, is a fixed width type really needed? I'd prefer if
"unsigned int" was used, and if eventually we'd then also
replace ...

> @@ -45,10 +55,10 @@ static inline int _read_trylock(rwlock_t *lock)
>      u32 cnts;

... this and ...

> @@ -64,7 +74,7 @@ static inline void _read_lock(rwlock_t *lock)
>      u32 cnts;

... this (and possible others).

> @@ -115,6 +125,11 @@ static inline int _rw_is_locked(rwlock_t *lock)
>      return atomic_read(&lock->cnts);
>  }
>  
> +static inline uint32_t _write_lock_val(void)

Same here then.

With these taken care of (as long as you agree, and likely doable
again while committing)
Reviewed-by: Jan Beulich <jbeulich@suse.com>

Jan
Julien Grall Feb. 25, 2020, 3:32 p.m. UTC | #4
On 24/02/2020 10:10, Roger Pau Monné wrote:
> On Mon, Feb 24, 2020 at 10:05:39AM +0000, Julien Grall wrote:
>> Hi Roger,
>>
>> On 24/02/2020 08:44, Roger Pau Monne wrote:
>>> Allow a CPU already holding the lock in write mode to also lock it in
>>> read mode. There's no harm in allowing read locking a rwlock that's
>>> already owned by the caller (ie: CPU) in write mode. Allowing such
>>> accesses is required at least for the CPU maps use-case.
>>>
>>> In order to do this reserve 12bits of the lock, this allows to support
>>> up to 4096 CPUs. Also reduce the write lock mask to 2 bits: one to
>>> signal there are pending writers waiting on the lock and the other to
>>> signal the lock is owned in write mode.
>>>
>>> This reduces the maximum number of concurrent readers from 16777216 to
>>> 262144, I think this should still be enough, or else the lock field
>>> can be expanded from 32 to 64bits if all architectures support atomic
>>> operations on 64bit integers.
>>>
>>> Fixes: 5872c83b42c608 ('smp: convert the cpu maps lock into a rw lock')
>>> Reported-by: Jan Beulich <jbeulich@suse.com>
>>> Reported-by: Jürgen Groß <jgross@suse.com>
>>> Signed-off-by: Roger Pau Monné <roger.pau@citrix.com>
>>> ---
>>> Changes since v2:
>>>    - Use atomic_and to clear the writers part of the lock when
>>>      write-unlocking.
>>>    - Reduce writer-related area to 14bits.
>>>    - Include xen/smp.h for Arm64.
>>
>> OOI, is it to use smp_processor_id()?
> 
> Yes, or else I would get errors when building asm-offsets on Arm64
> IIRC.

Thank you for the confirmation.

Reviewed-by: Julien Grall <julien@xen.org>

Cheers,

> 
> Thanks, Roger.
>
Roger Pau Monné Feb. 25, 2020, 3:59 p.m. UTC | #5
On Tue, Feb 25, 2020 at 04:23:34PM +0100, Jan Beulich wrote:
> On 24.02.2020 09:44, Roger Pau Monne wrote:
> > @@ -20,21 +21,30 @@ typedef struct {
> >  #define DEFINE_RWLOCK(l) rwlock_t l = RW_LOCK_UNLOCKED
> >  #define rwlock_init(l) (*(l) = (rwlock_t)RW_LOCK_UNLOCKED)
> >  
> > -/*
> > - * Writer states & reader shift and bias.
> > - *
> > - * Writer field is 8 bit to allow for potential optimisation, see
> > - * _write_unlock().
> > - */
> > -#define    _QW_WAITING  1               /* A writer is waiting     */
> > -#define    _QW_LOCKED   0xff            /* A writer holds the lock */
> > -#define    _QW_WMASK    0xff            /* Writer mask.*/
> > -#define    _QR_SHIFT    8               /* Reader count shift      */
> > +/* Writer states & reader shift and bias. */
> > +#define    _QW_CPUMASK  0xfff               /* Writer CPU mask */
> > +#define    _QW_SHIFT    12                  /* Writer flags shift */
> > +#define    _QW_WAITING  (1u << _QW_SHIFT)   /* A writer is waiting */
> > +#define    _QW_LOCKED   (3u << _QW_SHIFT)   /* A writer holds the lock */
> > +#define    _QW_WMASK    (3u << _QW_SHIFT)   /* Writer mask */
> > +#define    _QR_SHIFT    14                  /* Reader count shift */
> 
> In particular with the suggested change of atomic_and()'s first
> parameter's type, perhaps the u suffixes want dropping here?

That would be fine. But seeing as we are planning on handling the
result of atomic_read as an unsigned int I'm not sure if it wold be
better to also keep those as unsigned ints.

> > +static inline bool _is_write_locked_by_me(uint32_t cnts)
> 
> Both here and ...
> 
> > +{
> > +    BUILD_BUG_ON(_QW_CPUMASK < NR_CPUS);
> > +    return (cnts & _QW_WMASK) == _QW_LOCKED &&
> > +           (cnts & _QW_CPUMASK) == smp_processor_id();
> > +}
> > +
> > +static inline bool _can_read_lock(uint32_t cnts)
> 
> ... here, is a fixed width type really needed? I'd prefer if
> "unsigned int" was used, and if eventually we'd then also
> replace ...

The code uniformly uses uint32_t as the cnts type, I'm fine with
switching to unsigned int, I've used uint32_t to keep it coherent with
the rest of the code.

> > @@ -45,10 +55,10 @@ static inline int _read_trylock(rwlock_t *lock)
> >      u32 cnts;
> 
> ... this and ...
> 
> > @@ -64,7 +74,7 @@ static inline void _read_lock(rwlock_t *lock)
> >      u32 cnts;
> 
> ... this (and possible others).
> 
> > @@ -115,6 +125,11 @@ static inline int _rw_is_locked(rwlock_t *lock)
> >      return atomic_read(&lock->cnts);
> >  }
> >  
> > +static inline uint32_t _write_lock_val(void)
> 
> Same here then.
> 
> With these taken care of (as long as you agree, and likely doable
> again while committing)
> Reviewed-by: Jan Beulich <jbeulich@suse.com>

Thanks, feel free to adjust on commit, or else I can send a new
version.

Roger.

Patch
diff mbox series

diff --git a/xen/common/rwlock.c b/xen/common/rwlock.c
index d568bbf6de..dadab372b5 100644
--- a/xen/common/rwlock.c
+++ b/xen/common/rwlock.c
@@ -69,7 +69,7 @@  void queue_write_lock_slowpath(rwlock_t *lock)
 
     /* Try to acquire the lock directly if no reader is present. */
     if ( !atomic_read(&lock->cnts) &&
-         (atomic_cmpxchg(&lock->cnts, 0, _QW_LOCKED) == 0) )
+         (atomic_cmpxchg(&lock->cnts, 0, _write_lock_val()) == 0) )
         goto unlock;
 
     /*
@@ -93,7 +93,7 @@  void queue_write_lock_slowpath(rwlock_t *lock)
         cnts = atomic_read(&lock->cnts);
         if ( (cnts == _QW_WAITING) &&
              (atomic_cmpxchg(&lock->cnts, _QW_WAITING,
-                             _QW_LOCKED) == _QW_WAITING) )
+                             _write_lock_val()) == _QW_WAITING) )
             break;
 
         cpu_relax();
diff --git a/xen/include/xen/rwlock.h b/xen/include/xen/rwlock.h
index 3dfea1ac2a..82af36b1b4 100644
--- a/xen/include/xen/rwlock.h
+++ b/xen/include/xen/rwlock.h
@@ -2,6 +2,7 @@ 
 #define __RWLOCK_H__
 
 #include <xen/percpu.h>
+#include <xen/smp.h>
 #include <xen/spinlock.h>
 
 #include <asm/atomic.h>
@@ -20,21 +21,30 @@  typedef struct {
 #define DEFINE_RWLOCK(l) rwlock_t l = RW_LOCK_UNLOCKED
 #define rwlock_init(l) (*(l) = (rwlock_t)RW_LOCK_UNLOCKED)
 
-/*
- * Writer states & reader shift and bias.
- *
- * Writer field is 8 bit to allow for potential optimisation, see
- * _write_unlock().
- */
-#define    _QW_WAITING  1               /* A writer is waiting     */
-#define    _QW_LOCKED   0xff            /* A writer holds the lock */
-#define    _QW_WMASK    0xff            /* Writer mask.*/
-#define    _QR_SHIFT    8               /* Reader count shift      */
+/* Writer states & reader shift and bias. */
+#define    _QW_CPUMASK  0xfff               /* Writer CPU mask */
+#define    _QW_SHIFT    12                  /* Writer flags shift */
+#define    _QW_WAITING  (1u << _QW_SHIFT)   /* A writer is waiting */
+#define    _QW_LOCKED   (3u << _QW_SHIFT)   /* A writer holds the lock */
+#define    _QW_WMASK    (3u << _QW_SHIFT)   /* Writer mask */
+#define    _QR_SHIFT    14                  /* Reader count shift */
 #define    _QR_BIAS     (1U << _QR_SHIFT)
 
 void queue_read_lock_slowpath(rwlock_t *lock);
 void queue_write_lock_slowpath(rwlock_t *lock);
 
+static inline bool _is_write_locked_by_me(uint32_t cnts)
+{
+    BUILD_BUG_ON(_QW_CPUMASK < NR_CPUS);
+    return (cnts & _QW_WMASK) == _QW_LOCKED &&
+           (cnts & _QW_CPUMASK) == smp_processor_id();
+}
+
+static inline bool _can_read_lock(uint32_t cnts)
+{
+    return !(cnts & _QW_WMASK) || _is_write_locked_by_me(cnts);
+}
+
 /*
  * _read_trylock - try to acquire read lock of a queue rwlock.
  * @lock : Pointer to queue rwlock structure.
@@ -45,10 +55,10 @@  static inline int _read_trylock(rwlock_t *lock)
     u32 cnts;
 
     cnts = atomic_read(&lock->cnts);
-    if ( likely(!(cnts & _QW_WMASK)) )
+    if ( likely(_can_read_lock(cnts)) )
     {
         cnts = (u32)atomic_add_return(_QR_BIAS, &lock->cnts);
-        if ( likely(!(cnts & _QW_WMASK)) )
+        if ( likely(_can_read_lock(cnts)) )
             return 1;
         atomic_sub(_QR_BIAS, &lock->cnts);
     }
@@ -64,7 +74,7 @@  static inline void _read_lock(rwlock_t *lock)
     u32 cnts;
 
     cnts = atomic_add_return(_QR_BIAS, &lock->cnts);
-    if ( likely(!(cnts & _QW_WMASK)) )
+    if ( likely(_can_read_lock(cnts)) )
         return;
 
     /* The slowpath will decrement the reader count, if necessary. */
@@ -115,6 +125,11 @@  static inline int _rw_is_locked(rwlock_t *lock)
     return atomic_read(&lock->cnts);
 }
 
+static inline uint32_t _write_lock_val(void)
+{
+    return _QW_LOCKED | smp_processor_id();
+}
+
 /*
  * queue_write_lock - acquire write lock of a queue rwlock.
  * @lock : Pointer to queue rwlock structure.
@@ -122,7 +137,7 @@  static inline int _rw_is_locked(rwlock_t *lock)
 static inline void _write_lock(rwlock_t *lock)
 {
     /* Optimize for the unfair lock case where the fair flag is 0. */
-    if ( atomic_cmpxchg(&lock->cnts, 0, _QW_LOCKED) == 0 )
+    if ( atomic_cmpxchg(&lock->cnts, 0, _write_lock_val()) == 0 )
         return;
 
     queue_write_lock_slowpath(lock);
@@ -157,16 +172,13 @@  static inline int _write_trylock(rwlock_t *lock)
     if ( unlikely(cnts) )
         return 0;
 
-    return likely(atomic_cmpxchg(&lock->cnts, 0, _QW_LOCKED) == 0);
+    return likely(atomic_cmpxchg(&lock->cnts, 0, _write_lock_val()) == 0);
 }
 
 static inline void _write_unlock(rwlock_t *lock)
 {
-    /*
-     * If the writer field is atomic, it can be cleared directly.
-     * Otherwise, an atomic subtraction will be used to clear it.
-     */
-    atomic_sub(_QW_LOCKED, &lock->cnts);
+    ASSERT(_is_write_locked_by_me(atomic_read(&lock->cnts)));
+    atomic_and(~(_QW_CPUMASK | _QW_WMASK), &lock->cnts);
 }
 
 static inline void _write_unlock_irq(rwlock_t *lock)