diff mbox series

Add next, prev pointer in xa_node at the lib/xarray.c

Message ID CAHOvCC7ZLpOkdWPjY3art8LYOh2SJWwgqYRHRMVm-E4-kD06mA@mail.gmail.com (mailing list archive)
State New, archived
Headers show
Series Add next, prev pointer in xa_node at the lib/xarray.c | expand

Commit Message

JaeJoon Jung March 17, 2020, 7:32 a.m. UTC
Hi Matthew,
I add next, prev pointer in the xa_node to improve search performance.
XArray currently has the following search capabilities:

Search algorithm performance is O(n) = n x log(n)

For example,
XA_CHUNK_SHIFT 2
XA_CHUNK_SIZE 4
===============================================================================
xarray->head : xa_node : index: 0   16   32   48
                                                   +----+----+----+
                                                   |      |      |      |
                   +-----------------------+      |      |      +-------------+
                    |                   +------------+      |
              |
                    |                   |                        |
                   |
                   0  4  8  12    16  20  24  28   32  36  40  44   48
 52  56  60
                   +--+--+--+     +---+---+----+    +----+---+---+
+---+---+---+
                   |                    |                        |
                   |
     +---------+                   |                        +----+
           +---------------+
     |                                 |
 |                                     |
    0 1 2 3     ....             16 17 18 19    ....      32 33 34 35
   ....          48 49 ..

xas_for_each(xas, entry, max) \
    for (entry = xas_find(xas, max); entry; \
         entry = xas_next_entry(xas, max))

O(n) = n x log(n)

The above performance is significantly reduced if the indes are not
densely packed.
But,
If you connect the leaf node with the next and prev pointers as follows,
the search performance is improved to O (n) = n.
Below is a larger performance improvement when the indexes are not dense.
I've added enough code to tools/testing/radix-tree/benchmark.c
to fully test it on the linux v5.6-rc6
and have seen a 5% to 20% performance improvement over time.

XA_CHUNK_SHIFT 2
XA_CHUNK_SIZE  4
add next, prev pointer in leaf-node
===============================================================================
xarray->head : xa_node : index: 0  16   32   48
                                                   +---+----+----+
                                                    |    |      |      |
                  +-------------------------+    |      |     +--------------+
                  |                   +--------------+     |
              |
                  |                   |                         |
                   |
                 0  4  8  12    16  20  24  28   32  36  40  44    48
52  56  60
                 +--+--+---+    +---+---+---+     +---+---+----+
+---+---+---+
                 |                    |                       |
                  |
+-----------+                    |                       +---------+
        +--------------------+
|                                    |
    |                                       |
0 1 2 3 <==> ... <==> 16 17 18 19 <==>...<==> 32 33 34 35 <==>...<==> 48 49 ..

next loop:
xas_for_each_next_fast(xas, entry, max) \
    for (entry = xas_find(xas, max); entry; \
         entry = xas_next_fast(xas, max))
O(n) = n

prev loop:
xas_for_each_prev_fast(xas, entry, min) \
    for (entry = xas_find(xas, min); entry; \
         entry = xas_prev_fast(xas, min))
O(n) = n

===============================================================================

 include/linux/xarray.h                                |   96 +++++++
 lib/xarray.c                                                |  136 ++++++++++
 tools/testing/radix-tree/benchmark.c        |   78 ++++++
 tools/testing/radix-tree/linux.c                  |    4 +
 tools/testing/radix-tree/main.c                  |    1 +
 tools/testing/radix-tree/test.h                    |    1 +

===============================================================================

===============================================================================

Please check the above.
Best Regards,
JaeJoon Jung.

Comments

Matthew Wilcox March 17, 2020, 1:40 p.m. UTC | #1
On Tue, Mar 17, 2020 at 04:32:34PM +0900, JaeJoon Jung wrote:
> Hi Matthew,
> I add next, prev pointer in the xa_node to improve search performance.
> XArray currently has the following search capabilities:
> 
> Search algorithm performance is O(n) = n x log(n)

That's not how "big-O" notation is used.
https://en.wikipedia.org/wiki/Big_O_notation

What you mean to say here is O(n.log(n)).

> For example,
> XA_CHUNK_SHIFT 2
> XA_CHUNK_SIZE 4

I'm not really interested in the performance of a cut-down radix tree.
You can re-run the numbers with a SHIFT of 6 and SIZE of 64 to get a
more useful set of performance numbers.

> If you connect the leaf node with the next and prev pointers as follows,
> the search performance is improved to O (n) = n.

But that's not free.  Right now, the xa_node is finely tuned in
size to 576 bytes on 64-bit systems (32-bit systems are a secondary
consideration).  That gets us 7 nodes per page.  Increasing the size any
further reduces the number per page to 6, which is a pretty meaningful
increase in memory usage.

> @@ -1125,6 +1125,8 @@ struct xa_node {
>         unsigned char   count;          /* Total entry count */
>         unsigned char   nr_values;      /* Value entry count */
>         struct xa_node __rcu *parent;   /* NULL at top of tree */
> +        struct xa_node __rcu *prev;     /* previous node pointer */
> +        struct xa_node __rcu *next;     /* next node pointer */

You should be indenting with tabs, not spaces.  Or your mail system is
messing with your patches.
JaeJoon Jung March 18, 2020, 12:49 a.m. UTC | #2
Hi Matthew,
Thank you for your deep response.

I think XArray is a very well-made structure that requires no further
improvement if the indexes are dense in a 64-bit system.

Since xa_node is 576 bytes and is set to fit 7 per 4kB page,
adding a data type to xa_node is rather a memory waste problem.

#define XA_CHUNK_SHIFT          (CONFIG_BASE_SMALL ? 4 : 6)
XA_CHUNK_SIZE is 32 or 64

XA_CHUNK_SIZE is 32 or 64, and currently XAarray is 64 optimized,
so modifying it may have a counterproductive effect.

There is a downside to the well-designed XArray, but if the indexes
are not dense or deleted a lot, the memory of the slots in the xa_node
is increased and the search cost for logn increases at f (n) = O
(nlogn)

My worries are here and I hope you understand my efforts to solve it.

My attempts described below are meaningful when XA_CHUNK_SHIFT is 4 or
2, and can solve the slowdown in search speed when indexes are deleted
and not concentrated.

When XA_CHUNK_SHIFT is 2(XA_CHUNK_SIZE is 4), the XArray configuration
is as follows.
(It is difficult to express the picture in text.)

xarray->head : xa_node : index: 0  16  32  48
                +---+---+---+
                                |   |   |   |
        +-----------------------+   |   |   +------------+
        |              +------------+   |                |
        |              |                |                |
    0  4  8  12    16  20  24  28   32  36  40  44   48  52  56  60
    +--+--+---+    +---+---+---+    +---+---+---+    +---+---+---+
        |              |                |                |
  +-----+              |                +-------+        +---------------+
  |                    |                        |                        |
  0 1 2 3 <==>... <==> 16 17 18 19 <==>... <==> 32 33 34 35 <==>...
<==> 48 49 ..

In the above, if xa_node is connected to next and prev, it does not
search the parent node, so the logn disappears from f (n) = O (nlogn),
which improves search efficiency with f (n) = O (n). In fact, this is
already well-known in traditional algorithms, but I hope you can
understand it with the effort applied to XArray.

Best Regards,
JaeJoon Jung.

On Tue, 17 Mar 2020 at 22:40, Matthew Wilcox <willy@infradead.org> wrote:
>
> On Tue, Mar 17, 2020 at 04:32:34PM +0900, JaeJoon Jung wrote:
> > Hi Matthew,
> > I add next, prev pointer in the xa_node to improve search performance.
> > XArray currently has the following search capabilities:
> >
> > Search algorithm performance is O(n) = n x log(n)
>
> That's not how "big-O" notation is used.
> https://en.wikipedia.org/wiki/Big_O_notation
>
> What you mean to say here is O(n.log(n)).
>
> > For example,
> > XA_CHUNK_SHIFT 2
> > XA_CHUNK_SIZE 4
>
> I'm not really interested in the performance of a cut-down radix tree.
> You can re-run the numbers with a SHIFT of 6 and SIZE of 64 to get a
> more useful set of performance numbers.
>
> > If you connect the leaf node with the next and prev pointers as follows,
> > the search performance is improved to O (n) = n.
>
> But that's not free.  Right now, the xa_node is finely tuned in
> size to 576 bytes on 64-bit systems (32-bit systems are a secondary
> consideration).  That gets us 7 nodes per page.  Increasing the size any
> further reduces the number per page to 6, which is a pretty meaningful
> increase in memory usage.
>
> > @@ -1125,6 +1125,8 @@ struct xa_node {
> >         unsigned char   count;          /* Total entry count */
> >         unsigned char   nr_values;      /* Value entry count */
> >         struct xa_node __rcu *parent;   /* NULL at top of tree */
> > +        struct xa_node __rcu *prev;     /* previous node pointer */
> > +        struct xa_node __rcu *next;     /* next node pointer */
>
> You should be indenting with tabs, not spaces.  Or your mail system is
> messing with your patches.
>
diff mbox series

Patch

diff --git a/include/linux/xarray.h b/include/linux/xarray.h
index f73e1775ded0..bcc78459acde 100644
--- a/include/linux/xarray.h
+++ b/include/linux/xarray.h
@@ -1125,6 +1125,8 @@  struct xa_node {
        unsigned char   count;          /* Total entry count */
        unsigned char   nr_values;      /* Value entry count */
        struct xa_node __rcu *parent;   /* NULL at top of tree */
+        struct xa_node __rcu *prev;     /* previous node pointer */
+        struct xa_node __rcu *next;     /* next node pointer */
        struct xarray   *array;         /* The array we belong to */
        union {
                struct list_head private_list;  /* For tree user */
@@ -1206,6 +1208,38 @@  static inline struct xa_node
*xa_parent_locked(const struct xarray *xa,
                                                lockdep_is_held(&xa->xa_lock));
 }

+/* Private */
+static inline struct xa_node *xa_prev(const struct xarray *xa,
+                                        const struct xa_node *node)
+{
+        return rcu_dereference_check(node->prev,
+                                                lockdep_is_held(&xa->xa_lock));
+}
+
+/* Private */
+static inline struct xa_node *xa_prev_locked(const struct xarray *xa,
+                                        const struct xa_node *node)
+{
+        return rcu_dereference_protected(node->prev,
+                                                lockdep_is_held(&xa->xa_lock));
+}
+
+/* Private */
+static inline struct xa_node *xa_next(const struct xarray *xa,
+                                        const struct xa_node *node)
+{
+        return rcu_dereference_check(node->next,
+                                                lockdep_is_held(&xa->xa_lock));
+}
+
+/* Private */
+static inline struct xa_node *xa_next_locked(const struct xarray *xa,
+                                        const struct xa_node *node)
+{
+        return rcu_dereference_protected(node->next,
+                                                lockdep_is_held(&xa->xa_lock));
+}
+
 /* Private */
 static inline void *xa_mk_node(const struct xa_node *node)
 {
@@ -1611,6 +1645,60 @@  static inline void *xas_next_entry(struct
xa_state *xas, unsigned long max)
        return entry;
 }

+/* Private */
+static inline void *xas_next_fast(struct xa_state *xas, unsigned long max)
+{
+        struct xa_node *node = xas->xa_node;
+        int offset = xas->xa_offset;
+        void *entry;
+
+        do {
+                if (unlikely(xas->xa_index >= max))
+                        return xas_find(xas, max);
+                if (unlikely(xas->xa_offset == XA_CHUNK_MASK)) {
+                        node = node->next;
+                        xas->xa_node = node;
+                        offset = -1;
+                }
+                if (unlikely(xas_not_node(node)))
+                        return xas_find(xas, max);
+                entry = xa_entry(xas->xa, node, offset + 1);
+                if (unlikely(xa_is_internal(entry)))
+                        return xas_find(xas, max);
+                offset++;
+                xas->xa_offset = offset;
+                xas->xa_index = xa_to_value(entry);
+        } while (!entry);
+
+        return entry;
+}
+
+/* Private */
+static inline void *xas_prev_fast(struct xa_state *xas, unsigned long max)
+{
+       struct xa_node *node = xas->xa_node;
+       void *entry;
+
+       do {
+                if (unlikely(xas->xa_index > max))
+                       return xas_find(xas, max);
+                if (unlikely(xas->xa_offset == 0)) {
+                        node = node->prev;
+                        xas->xa_node = node;
+                        xas->xa_offset = XA_CHUNK_SIZE;
+                }
+                if (unlikely(xas_not_node(node)))
+                        return xas_find(xas, max);
+               entry = xa_entry(xas->xa, node, xas->xa_offset - 1);
+               if (unlikely(xa_is_internal(entry)))
+                        return xas_find(xas, max);
+               xas->xa_offset--;
+               xas->xa_index = xa_to_value(entry);
+       } while (!entry);
+
+       return entry;
+}
+
 /* Private */
 static inline unsigned int xas_find_chunk(struct xa_state *xas, bool advance,
                xa_mark_t mark)
@@ -1687,6 +1775,14 @@  enum {
        for (entry = xas_find(xas, max); entry; \
             entry = xas_next_entry(xas, max))

+#define xas_for_each_next_fast(xas, entry, max) \
+        for (entry = xas_find(xas, max); entry; \
+             entry = xas_next_fast(xas, max))
+
+#define xas_for_each_prev_fast(xas, entry, min) \
+        for (entry = xas_find(xas, min); entry; \
+             entry = xas_prev_fast(xas, min))
+
 /**
  * xas_for_each_marked() - Iterate over a range of an XArray.
  * @xas: XArray operation state.

===============================================================================

diff --git a/lib/xarray.c b/lib/xarray.c
index 1d9fab7db8da..92a129560b22 100644
--- a/lib/xarray.c
+++ b/lib/xarray.c
@@ -258,6 +258,106 @@  static void xa_node_free(struct xa_node *node)
        call_rcu(&node->rcu_head, radix_tree_node_rcu_free);
 }

+/*
+ * xas_node_find_prev() - find previous node in the parent.
+ * @xas: XArray operation state.
+ * @parent: parent node.
+ * @start: starting offset.
+ *
+ * This function call in the xas_alloc().
+ */
+static struct xa_node *xas_node_find_prev(struct xa_state *xas,
+                struct xa_node *parent, int offset)
+{
+        void *entry;
+        struct xa_node *prev;
+
+        while (parent) {
+                prev = NULL;
+                while (offset >= 0) {
+                        entry = xa_entry(xas->xa, parent, offset);
+                        if (xa_is_node(entry)) {
+                                prev = xa_to_node(entry);
+                                break;
+                        }
+                        offset--;
+                }
+
+                if (offset < 0) {
+                        offset = parent->offset - 1;
+                        parent = xa_parent_locked(xas->xa, parent);
+                } else if (prev) {
+                        if (prev->shift==0)
+                                return prev;
+                        offset = XA_CHUNK_MASK;
+                        parent = prev;
+                }
+        }
+        return NULL;
+}
+
+/*
+ * xas_node_find_next() - find next node in the parent.
+ * @xas: XArray operation state.
+ * @parent: parent node.
+ * @start: starting offset.
+ *
+ * This function call in the xas_alloc().
+ */
+static struct xa_node *xas_node_find_next(struct xa_state *xas,
+                struct xa_node *parent, int offset)
+{
+        void *entry;
+        struct xa_node *next;
+
+        while (parent) {
+                next = NULL;
+                while (offset < XA_CHUNK_SIZE) {
+                        entry = xa_entry(xas->xa, parent, offset);
+                        if (xa_is_node(entry)) {
+                                next = xa_to_node(entry);
+                                break;
+                        }
+                        offset++;
+                }
+                if (next) {
+                        if (next->shift==0)
+                                return next;
+                        offset = 0;
+                        parent = next;
+                } else {
+                        offset = parent->offset + 1;
+                        parent = xa_parent_locked(xas->xa, parent);
+                }
+        }
+        return NULL;
+}
+
+/*
+ * xas_node_delete_link() - link node pointer to previous and nexta.
+ * @xas: XArray operation state.
+ * @node: deleting node.
+ *
+ * This function call before xa_node_free().
+ * node->prev->next = node->nex
+ * node->next->prev = node->prev
+ */
+static void xas_node_delete_link(struct xa_state *xas, struct xa_node *node)
+{
+        struct xa_node *prev, *next;
+
+        if (node->shift == 0) {
+                prev = xa_prev_locked(xas->xa, node);
+                next = xa_next_locked(xas->xa, node);
+                if (!xas_not_node(prev))
+                        RCU_INIT_POINTER(prev->next, next);
+                if (!xas_not_node(next))
+                        RCU_INIT_POINTER(next->prev, prev);
+        }
+        node->prev = NULL;
+        node->next = NULL;
+}
+
 /*
  * xas_destroy() - Free any resources allocated during the XArray operation.
  * @xas: XArray operation state.
@@ -389,6 +489,29 @@  static void *xas_alloc(struct xa_state *xas,
unsigned int shift)
        RCU_INIT_POINTER(node->parent, xas->xa_node);
        node->array = xas->xa;

+        /*
+         * link node to previous and next after alloc
+         */
+         if (parent) {
+                struct xa_node *prev, *next;
+
+                prev = xas_node_find_prev(xas, parent, node->offset-1);
+                if (xas_not_node(prev)) {
+                        RCU_INIT_POINTER(parent->prev, node);
+                } else {
+                        RCU_INIT_POINTER(node->prev, prev);
+                        RCU_INIT_POINTER(prev->next, node);
+                }
+
+                next = xas_node_find_next(xas, parent, node->offset+1);
+                if (xas_not_node(next)) {
+                        RCU_INIT_POINTER(parent->next, node);
+                } else {
+                        RCU_INIT_POINTER(node->next, next);
+                        RCU_INIT_POINTER(next->prev, node);
+                }
+        }
+
        return node;
 }

@@ -459,7 +582,10 @@  static void xas_shrink(struct xa_state *xas)
                if (!xa_is_node(entry))
                        RCU_INIT_POINTER(node->slots[0], XA_RETRY_ENTRY);
                xas_update(xas, node);
+
+                xas_node_delete_link(xas, node);
                xa_node_free(node);
+
                if (!xa_is_node(entry))
                        break;
                node = xa_to_node(entry);
@@ -488,6 +614,8 @@  static void xas_delete_node(struct xa_state *xas)
                parent = xa_parent_locked(xas->xa, node);
                xas->xa_node = parent;
                xas->xa_offset = node->offset;
+
+                xas_node_delete_link(xas, node);
                xa_node_free(node);

                if (!parent) {
@@ -540,7 +668,10 @@  static void xas_free_nodes(struct xa_state *xas,
struct xa_node *top)
                        node->count = 0;
                        node->nr_values = 0;
                        xas_update(xas, node);
+
+                        xas_node_delete_link(xas, node);
                        xa_node_free(node);
+
                        if (node == top)
                                return;
                        node = parent;
@@ -607,6 +738,11 @@  static int xas_expand(struct xa_state *xas, void *head)
                if (xa_is_node(head)) {
                        xa_to_node(head)->offset = 0;
                        rcu_assign_pointer(xa_to_node(head)->parent, node);
+                        /*
+                         * link node to previous and next after expand.
+                         */
+                        rcu_assign_pointer(node->prev, xa_to_node(head));
+                        rcu_assign_pointer(node->next, xa_to_node(head));
                }
                head = xa_mk_node(node);
                rcu_assign_pointer(xa->xa_head, head);

===============================================================================

diff --git a/tools/testing/radix-tree/benchmark.c
b/tools/testing/radix-tree/benchmark.c
index 523c79f22ed3..3ed52942185c 100644
--- a/tools/testing/radix-tree/benchmark.c
+++ b/tools/testing/radix-tree/benchmark.c
@@ -148,3 +148,81 @@  void benchmark(void)
                for (s = 0; step[s]; s++)
                        benchmark_size(size[c], step[s]);
 }
+
+static long benchmark_xas_size(struct xarray *xa,
+        unsigned long size, unsigned long skip)
+{
+        unsigned long i, store=0, erase=0;
+
+        for (i = 0; i < size; i++) {
+               xa_store(xa, i, xa_mk_value(i & LONG_MAX), GFP_KERNEL);
+               store++;
+        }
+        for (i = 0; i < size; i++) {
+                if (!(i % skip)) continue;
+                xa_erase(xa, i);
+                erase++;
+        }
+        return store - erase;
+}
+
+void benchmark_xas_perf(void)
+{
+        static DEFINE_XARRAY(xarray);
+        static XA_STATE(xas, &xarray, 0);
+       void *entry;
+       unsigned long index, value;
+       unsigned long seen=0;
+
+       struct timespec start, finish;
+        long store, nsec1, nsec2;
+        const unsigned long size = 1 << 20;
+        const unsigned long skip = 7;
+
+        printv(1, "starting benchmark_xas_perf\n");
+
+        store = benchmark_xas_size(&xarray, size, skip);
+
+        clock_gettime(CLOCK_MONOTONIC, &start);
+        rcu_read_lock();
+        xas_for_each(&xas, entry, ULONG_MAX) {
+                value = xa_to_value(entry);
+                index = value % skip;
+                WARN_ON(index);
+               seen++;
+       }
+        rcu_read_unlock();
+        clock_gettime(CLOCK_MONOTONIC, &finish);
+        nsec1 = (finish.tv_sec - start.tv_sec) * NSEC_PER_SEC +
+              (finish.tv_nsec - start.tv_nsec);
+        printv(2, "store=%ld, seen=%lu, elapsed=%lu(ns)\n",
+                        store, seen, nsec1);
+        WARN_ON(store != seen);
+
+        xa_destroy(&xarray);
+        store = benchmark_xas_size(&xarray, size, skip);
+
+        seen = 0;
+        xas_set(&xas, 0);
+        clock_gettime(CLOCK_MONOTONIC, &start);
+        rcu_read_lock();
+        xas_for_each_next_fast(&xas, entry, ULONG_MAX) {
+                value = xa_to_value(entry);
+                index = value % skip;
+                WARN_ON(index);
+               seen++;
+        }
+        rcu_read_unlock();
+        clock_gettime(CLOCK_MONOTONIC, &finish);
+        nsec2 = (finish.tv_sec - start.tv_sec) * NSEC_PER_SEC +
+              (finish.tv_nsec - start.tv_nsec);
+        printv(2, "store=%ld, seen=%lu, elapsed=%lu(ns)\n",
+                        store, seen, nsec2);
+        WARN_ON(store != seen);
+
+        xa_destroy(&xarray);
+
+        printv(2, "perf=%f(%%)\n",
+                ((double)(nsec2-nsec1) / (double)(nsec2+nsec1)) * 100);
+}
+

===============================================================================

diff --git a/tools/testing/radix-tree/linux.c b/tools/testing/radix-tree/linux.c
index 44a0d1ad4408..4ba3be16aa7c 100644
--- a/tools/testing/radix-tree/linux.c
+++ b/tools/testing/radix-tree/linux.c
@@ -45,6 +45,8 @@  void *kmem_cache_alloc(struct kmem_cache *cachep, int flags)
                if (cachep->ctor)
                        cachep->ctor(node);
        }
+        node->prev = NULL;
+        node->next = NULL;

        uatomic_inc(&nr_allocated);
        if (kmalloc_verbose)
@@ -67,6 +69,8 @@  void kmem_cache_free(struct kmem_cache *cachep, void *objp)
                cachep->nr_objs++;
                node->parent = cachep->objs;
                cachep->objs = node;
+                node->prev = NULL;
+                node->next = NULL;
        }
        pthread_mutex_unlock(&cachep->lock);
 }

===============================================================================

diff --git a/tools/testing/radix-tree/main.c b/tools/testing/radix-tree/main.c
index 7a22d6e3732e..120d72fc6b8e 100644
--- a/tools/testing/radix-tree/main.c
+++ b/tools/testing/radix-tree/main.c
@@ -317,6 +317,7 @@  int main(int argc, char **argv)
        radix_tree_cpu_dead(0);

        benchmark();
+        benchmark_xas_perf();

        rcu_barrier();
        printv(2, "after rcu_barrier: %d allocated, preempt %d\n",

===============================================================================

diff --git a/tools/testing/radix-tree/test.h b/tools/testing/radix-tree/test.h
index 1ee4b2c0ad10..ce89f0c6b524 100644
--- a/tools/testing/radix-tree/test.h
+++ b/tools/testing/radix-tree/test.h
@@ -35,6 +35,7 @@  void tag_check(void);
 void multiorder_checks(void);
 void iteration_test(unsigned order, unsigned duration);
 void benchmark(void);
+void benchmark_xas_perf(void);
 void idr_checks(void);
 void ida_tests(void);