diff mbox series

[v4,sched_ext/for-6.11,2/2] sched_ext: Implement DSQ iterator

Message ID Zoh4kp7-jAFZXhe6@slm.duckdns.org (mailing list archive)
State Not Applicable
Headers show
Series None | expand

Commit Message

Tejun Heo July 5, 2024, 10:49 p.m. UTC
DSQs are very opaque in the consumption path. The BPF scheduler has no way
of knowing which tasks are being considered and which is picked. This patch
adds BPF DSQ iterator.

- Allows iterating tasks queued on a DSQ in the dispatch order or reverse
  from anywhere using bpf_for_each(scx_dsq) or calling the iterator kfuncs
  directly.

- Has ordering guarantee where only tasks which were already queued when the
  iteration started are visible and consumable during the iteration.

scx_qmap is updated to implement periodic dumping of the shared DSQ.

v4: - bpf_iter_scx_dsq_new() declaration in common.bpf.h was using the wrong
      type for the last argument (bool rev instead of u64 flags). Fix it.

v3: - Alexei pointed out that the iterator is too big to allocate on stack.
      Added a prep patch to reduce the size of the cursor. Now
      bpf_iter_scx_dsq is 48 bytes and bpf_iter_scx_dsq_kern is 40 bytes on
      64bit.

    - u32_before() comparison factored out.

v2: - scx_bpf_consume_task() is separated out into a separate patch.

    - DSQ seq and iter flags don't need to be u64. Use u32.

Signed-off-by: Tejun Heo <tj@kernel.org>
Reviewed-by: David Vernet <dvernet@meta.com>
Cc: Alexei Starovoitov <ast@kernel.org>
Cc: bpf@vger.kernel.org
---
 include/linux/sched/ext.h                |    3 
 kernel/sched/ext.c                       |  187 ++++++++++++++++++++++++++++++-
 tools/sched_ext/include/scx/common.bpf.h |    3 
 tools/sched_ext/scx_qmap.bpf.c           |   25 ++++
 tools/sched_ext/scx_qmap.c               |    8 -
 5 files changed, 222 insertions(+), 4 deletions(-)

Comments

Alexei Starovoitov July 8, 2024, 10:41 p.m. UTC | #1
On Fri, Jul 5, 2024 at 3:50 PM Tejun Heo <tj@kernel.org> wrote:
>
> DSQs are very opaque in the consumption path. The BPF scheduler has no way
> of knowing which tasks are being considered and which is picked. This patch
> adds BPF DSQ iterator.
>
> - Allows iterating tasks queued on a DSQ in the dispatch order or reverse
>   from anywhere using bpf_for_each(scx_dsq) or calling the iterator kfuncs
>   directly.
>
> - Has ordering guarantee where only tasks which were already queued when the
>   iteration started are visible and consumable during the iteration.
>
> scx_qmap is updated to implement periodic dumping of the shared DSQ.
>
> v4: - bpf_iter_scx_dsq_new() declaration in common.bpf.h was using the wrong
>       type for the last argument (bool rev instead of u64 flags). Fix it.

Overall lgtm.
Acked-by: Alexei Starovoitov <ast@kernel.org>

In the future pls resubmit the whole series as v4
(all patches not just one).
It was difficult for me to find the patch 1/2 without any vN tag
that corresponds to this v4 patch.
lore helped at the end.

Few nits below:

> v3: - Alexei pointed out that the iterator is too big to allocate on stack.
>       Added a prep patch to reduce the size of the cursor. Now
>       bpf_iter_scx_dsq is 48 bytes and bpf_iter_scx_dsq_kern is 40 bytes on
>       64bit.
>
>     - u32_before() comparison factored out.
>
> v2: - scx_bpf_consume_task() is separated out into a separate patch.
>
>     - DSQ seq and iter flags don't need to be u64. Use u32.
>
> Signed-off-by: Tejun Heo <tj@kernel.org>
> Reviewed-by: David Vernet <dvernet@meta.com>
> Cc: Alexei Starovoitov <ast@kernel.org>
> Cc: bpf@vger.kernel.org
> ---
>  include/linux/sched/ext.h                |    3
>  kernel/sched/ext.c                       |  187 ++++++++++++++++++++++++++++++-
>  tools/sched_ext/include/scx/common.bpf.h |    3
>  tools/sched_ext/scx_qmap.bpf.c           |   25 ++++
>  tools/sched_ext/scx_qmap.c               |    8 -
>  5 files changed, 222 insertions(+), 4 deletions(-)
>
> --- a/include/linux/sched/ext.h
> +++ b/include/linux/sched/ext.h
> @@ -61,6 +61,7 @@ struct scx_dispatch_q {
>         struct list_head        list;   /* tasks in dispatch order */
>         struct rb_root          priq;   /* used to order by p->scx.dsq_vtime */
>         u32                     nr;
> +       u32                     seq;    /* used by BPF iter */
>         u64                     id;
>         struct rhash_head       hash_node;
>         struct llist_node       free_node;
> @@ -123,6 +124,7 @@ enum scx_kf_mask {
>
>  struct scx_dsq_list_node {
>         struct list_head        node;
> +       bool                    is_bpf_iter_cursor;
>  };
>
>  /*
> @@ -133,6 +135,7 @@ struct sched_ext_entity {
>         struct scx_dispatch_q   *dsq;
>         struct scx_dsq_list_node dsq_list;      /* dispatch order */
>         struct rb_node          dsq_priq;       /* p->scx.dsq_vtime order */
> +       u32                     dsq_seq;
>         u32                     dsq_flags;      /* protected by DSQ lock */
>         u32                     flags;          /* protected by rq lock */
>         u32                     weight;
> --- a/kernel/sched/ext.c
> +++ b/kernel/sched/ext.c
> @@ -926,6 +926,11 @@ static u32 highest_bit(u32 flags)
>         return ((u64)1 << bit) >> 1;
>  }
>
> +static bool u32_before(u32 a, u32 b)
> +{
> +       return (s32)(a - b) < 0;
> +}
> +
>  /*
>   * scx_kf_mask enforcement. Some kfuncs can only be called from specific SCX
>   * ops. When invoking SCX ops, SCX_CALL_OP[_RET]() should be used to indicate
> @@ -1066,6 +1071,73 @@ static __always_inline bool scx_kf_allow
>         return true;
>  }
>
> +/**
> + * nldsq_next_task - Iterate to the next task in a non-local DSQ
> + * @dsq: user dsq being interated
> + * @cur: current position, %NULL to start iteration
> + * @rev: walk backwards
> + *
> + * Returns %NULL when iteration is finished.
> + */
> +static struct task_struct *nldsq_next_task(struct scx_dispatch_q *dsq,
> +                                          struct task_struct *cur, bool rev)
> +{
> +       struct list_head *list_node;
> +       struct scx_dsq_list_node *dsq_lnode;
> +
> +       lockdep_assert_held(&dsq->lock);
> +
> +       if (cur)
> +               list_node = &cur->scx.dsq_list.node;
> +       else
> +               list_node = &dsq->list;
> +
> +       /* find the next task, need to skip BPF iteration cursors */
> +       do {
> +               if (rev)
> +                       list_node = list_node->prev;
> +               else
> +                       list_node = list_node->next;
> +
> +               if (list_node == &dsq->list)
> +                       return NULL;
> +
> +               dsq_lnode = container_of(list_node, struct scx_dsq_list_node,
> +                                        node);
> +       } while (dsq_lnode->is_bpf_iter_cursor);
> +
> +       return container_of(dsq_lnode, struct task_struct, scx.dsq_list);
> +}
> +
> +#define nldsq_for_each_task(p, dsq)                                            \
> +       for ((p) = nldsq_next_task((dsq), NULL, false); (p);                    \
> +            (p) = nldsq_next_task((dsq), (p), false))
> +
> +
> +/*
> + * BPF DSQ iterator. Tasks in a non-local DSQ can be iterated in [reverse]
> + * dispatch order. BPF-visible iterator is opaque and larger to allow future
> + * changes without breaking backward compatibility. Can be used with
> + * bpf_for_each(). See bpf_iter_scx_dsq_*().
> + */
> +enum scx_dsq_iter_flags {
> +       /* iterate in the reverse dispatch order */
> +       SCX_DSQ_ITER_REV                = 1U << 0,
> +
> +       __SCX_DSQ_ITER_ALL_FLAGS        = SCX_DSQ_ITER_REV,
> +};
> +
> +struct bpf_iter_scx_dsq_kern {
> +       struct scx_dsq_list_node        cursor;
> +       struct scx_dispatch_q           *dsq;
> +       u32                             dsq_seq;
> +       u32                             flags;
> +} __attribute__((aligned(8)));
> +
> +struct bpf_iter_scx_dsq {
> +       u64                             __opaque[6];
> +} __attribute__((aligned(8)));
> +
>
>  /*
>   * SCX task iterator.
> @@ -1415,7 +1487,7 @@ static void dispatch_enqueue(struct scx_
>                  * tested easily when adding the first task.
>                  */
>                 if (unlikely(RB_EMPTY_ROOT(&dsq->priq) &&
> -                            !list_empty(&dsq->list)))
> +                            nldsq_next_task(dsq, NULL, false)))

There is also consume_dispatch_q() that is doing
list_empty(&dsq->list) check.
Does it need to be updated as well?

>                         scx_ops_error("DSQ ID 0x%016llx already had FIFO-enqueued tasks",
>                                       dsq->id);
>
> @@ -1447,6 +1519,10 @@ static void dispatch_enqueue(struct scx_
>                         list_add_tail(&p->scx.dsq_list.node, &dsq->list);
>         }
>
> +       /* seq records the order tasks are queued, used by BPF DSQ iterator */
> +       dsq->seq++;
> +       p->scx.dsq_seq = dsq->seq;
> +
>         dsq_mod_nr(dsq, 1);
>         p->scx.dsq = dsq;
>
> @@ -2109,7 +2185,7 @@ retry:
>
>         raw_spin_lock(&dsq->lock);
>
> -       list_for_each_entry(p, &dsq->list, scx.dsq_list.node) {
> +       nldsq_for_each_task(p, dsq) {
>                 struct rq *task_rq = task_rq(p);
>
>                 if (rq == task_rq) {
> @@ -5697,6 +5773,110 @@ __bpf_kfunc void scx_bpf_destroy_dsq(u64
>         destroy_dsq(dsq_id);
>  }
>
> +/**
> + * bpf_iter_scx_dsq_new - Create a DSQ iterator
> + * @it: iterator to initialize
> + * @dsq_id: DSQ to iterate
> + * @flags: %SCX_DSQ_ITER_*
> + *
> + * Initialize BPF iterator @it which can be used with bpf_for_each() to walk
> + * tasks in the DSQ specified by @dsq_id. Iteration using @it only includes
> + * tasks which are already queued when this function is invoked.
> + */
> +__bpf_kfunc int bpf_iter_scx_dsq_new(struct bpf_iter_scx_dsq *it, u64 dsq_id,
> +                                    u64 flags)
> +{
> +       struct bpf_iter_scx_dsq_kern *kit = (void *)it;
> +
> +       BUILD_BUG_ON(sizeof(struct bpf_iter_scx_dsq_kern) >
> +                    sizeof(struct bpf_iter_scx_dsq));
> +       BUILD_BUG_ON(__alignof__(struct bpf_iter_scx_dsq_kern) !=
> +                    __alignof__(struct bpf_iter_scx_dsq));
> +
> +       if (flags & ~__SCX_DSQ_ITER_ALL_FLAGS)
> +               return -EINVAL;
> +
> +       kit->dsq = find_non_local_dsq(dsq_id);
> +       if (!kit->dsq)
> +               return -ENOENT;
> +
> +       INIT_LIST_HEAD(&kit->cursor.node);
> +       kit->cursor.is_bpf_iter_cursor = true;
> +       kit->dsq_seq = READ_ONCE(kit->dsq->seq);
> +       kit->flags = flags;
> +
> +       return 0;
> +}
> +
> +/**
> + * bpf_iter_scx_dsq_next - Progress a DSQ iterator
> + * @it: iterator to progress
> + *
> + * Return the next task. See bpf_iter_scx_dsq_new().
> + */
> +__bpf_kfunc struct task_struct *bpf_iter_scx_dsq_next(struct bpf_iter_scx_dsq *it)
> +{
> +       struct bpf_iter_scx_dsq_kern *kit = (void *)it;
> +       bool rev = kit->flags & SCX_DSQ_ITER_REV;
> +       struct task_struct *p;
> +       unsigned long flags;
> +
> +       if (!kit->dsq)
> +               return NULL;
> +
> +       raw_spin_lock_irqsave(&kit->dsq->lock, flags);
> +
> +       if (list_empty(&kit->cursor.node))
> +               p = NULL;
> +       else
> +               p = container_of(&kit->cursor, struct task_struct, scx.dsq_list);
> +
> +       /*
> +        * Only tasks which were queued before the iteration started are
> +        * visible. This bounds BPF iterations and guarantees that vtime never
> +        * jumps in the other direction while iterating.
> +        */
> +       do {
> +               p = nldsq_next_task(kit->dsq, p, rev);
> +       } while (p && unlikely(u32_before(kit->dsq_seq, p->scx.dsq_seq)));
> +
> +       if (p) {
> +               if (rev)
> +                       list_move_tail(&kit->cursor.node, &p->scx.dsq_list.node);
> +               else
> +                       list_move(&kit->cursor.node, &p->scx.dsq_list.node);
> +       } else {
> +               list_del_init(&kit->cursor.node);
> +       }
> +
> +       raw_spin_unlock_irqrestore(&kit->dsq->lock, flags);
> +
> +       return p;
> +}
> +
> +/**
> + * bpf_iter_scx_dsq_destroy - Destroy a DSQ iterator
> + * @it: iterator to destroy
> + *
> + * Undo scx_iter_scx_dsq_new().
> + */
> +__bpf_kfunc void bpf_iter_scx_dsq_destroy(struct bpf_iter_scx_dsq *it)
> +{
> +       struct bpf_iter_scx_dsq_kern *kit = (void *)it;
> +
> +       if (!kit->dsq)
> +               return;
> +
> +       if (!list_empty(&kit->cursor.node)) {
> +               unsigned long flags;
> +
> +               raw_spin_lock_irqsave(&kit->dsq->lock, flags);
> +               list_del_init(&kit->cursor.node);
> +               raw_spin_unlock_irqrestore(&kit->dsq->lock, flags);
> +       }
> +       kit->dsq = NULL;
> +}
> +
>  __bpf_kfunc_end_defs();
>
>  static s32 __bstr_format(u64 *data_buf, char *line_buf, size_t line_size,
> @@ -6118,6 +6298,9 @@ BTF_KFUNCS_START(scx_kfunc_ids_any)
>  BTF_ID_FLAGS(func, scx_bpf_kick_cpu)
>  BTF_ID_FLAGS(func, scx_bpf_dsq_nr_queued)
>  BTF_ID_FLAGS(func, scx_bpf_destroy_dsq)
> +BTF_ID_FLAGS(func, bpf_iter_scx_dsq_new, KF_ITER_NEW | KF_RCU_PROTECTED)
> +BTF_ID_FLAGS(func, bpf_iter_scx_dsq_next, KF_ITER_NEXT | KF_RET_NULL)
> +BTF_ID_FLAGS(func, bpf_iter_scx_dsq_destroy, KF_ITER_DESTROY)
>  BTF_ID_FLAGS(func, scx_bpf_exit_bstr, KF_TRUSTED_ARGS)
>  BTF_ID_FLAGS(func, scx_bpf_error_bstr, KF_TRUSTED_ARGS)
>  BTF_ID_FLAGS(func, scx_bpf_dump_bstr, KF_TRUSTED_ARGS)
> --- a/tools/sched_ext/include/scx/common.bpf.h
> +++ b/tools/sched_ext/include/scx/common.bpf.h
> @@ -39,6 +39,9 @@ u32 scx_bpf_reenqueue_local(void) __ksym
>  void scx_bpf_kick_cpu(s32 cpu, u64 flags) __ksym;
>  s32 scx_bpf_dsq_nr_queued(u64 dsq_id) __ksym;
>  void scx_bpf_destroy_dsq(u64 dsq_id) __ksym;
> +int bpf_iter_scx_dsq_new(struct bpf_iter_scx_dsq *it, u64 dsq_id, u64 flags) __ksym __weak;
> +struct task_struct *bpf_iter_scx_dsq_next(struct bpf_iter_scx_dsq *it) __ksym __weak;
> +void bpf_iter_scx_dsq_destroy(struct bpf_iter_scx_dsq *it) __ksym __weak;
>  void scx_bpf_exit_bstr(s64 exit_code, char *fmt, unsigned long long *data, u32 data__sz) __ksym __weak;
>  void scx_bpf_error_bstr(char *fmt, unsigned long long *data, u32 data_len) __ksym;
>  void scx_bpf_dump_bstr(char *fmt, unsigned long long *data, u32 data_len) __ksym __weak;
> --- a/tools/sched_ext/scx_qmap.bpf.c
> +++ b/tools/sched_ext/scx_qmap.bpf.c

We typically split kernel changes vs bpf prog and selftests changes
into separate patches.

> @@ -36,6 +36,7 @@ const volatile u32 stall_user_nth;
>  const volatile u32 stall_kernel_nth;
>  const volatile u32 dsp_inf_loop_after;
>  const volatile u32 dsp_batch;
> +const volatile bool print_shared_dsq;
>  const volatile s32 disallow_tgid;
>  const volatile bool suppress_dump;
>
> @@ -604,10 +605,34 @@ out:
>         scx_bpf_put_cpumask(online);
>  }
>
> +/*
> + * Dump the currently queued tasks in the shared DSQ to demonstrate the usage of
> + * scx_bpf_dsq_nr_queued() and DSQ iterator. Raise the dispatch batch count to
> + * see meaningful dumps in the trace pipe.
> + */
> +static void dump_shared_dsq(void)
> +{
> +       struct task_struct *p;
> +       s32 nr;
> +
> +       if (!(nr = scx_bpf_dsq_nr_queued(SHARED_DSQ)))
> +               return;
> +
> +       bpf_printk("Dumping %d tasks in SHARED_DSQ in reverse order", nr);
> +
> +       bpf_rcu_read_lock();
> +       bpf_for_each(scx_dsq, p, SHARED_DSQ, SCX_DSQ_ITER_REV)
> +               bpf_printk("%s[%d]", p->comm, p->pid);
> +       bpf_rcu_read_unlock();
> +}

...

> +"  -P            Print out DSQ content to trace_pipe every second, use with -b\n"

tbh the demo of the iterator is so-so. Could have done something more
interesting :)
Tejun Heo July 8, 2024, 11:40 p.m. UTC | #2
Hello, Alexei.

On Mon, Jul 08, 2024 at 03:41:48PM -0700, Alexei Starovoitov wrote:
> In the future pls resubmit the whole series as v4
> (all patches not just one).
> It was difficult for me to find the patch 1/2 without any vN tag
> that corresponds to this v4 patch.
> lore helped at the end.

Sorry about that. That's me being lazy. It looks like even `b4 am` can't
figure out this threading.

> > @@ -1415,7 +1487,7 @@ static void dispatch_enqueue(struct scx_
> >                  * tested easily when adding the first task.
> >                  */
> >                 if (unlikely(RB_EMPTY_ROOT(&dsq->priq) &&
> > -                            !list_empty(&dsq->list)))
> > +                            nldsq_next_task(dsq, NULL, false)))
> 
> There is also consume_dispatch_q() that is doing
> list_empty(&dsq->list) check.
> Does it need to be updated as well?

The one in consume_dispatch_q() is an opportunistic unlocked test as by the
time consume_dispatch_q() is called list head update should be visible
without locking. The test should fail if there's anythingn on the list and
then the code locks the dsq and does proper nldsq_for_each_task(). So, yeah,
that should be a naked list_empty() test. I'll add a comment explaining
what's going on there.

...
> > @@ -6118,6 +6298,9 @@ BTF_KFUNCS_START(scx_kfunc_ids_any)
> >  BTF_ID_FLAGS(func, scx_bpf_kick_cpu)
> >  BTF_ID_FLAGS(func, scx_bpf_dsq_nr_queued)
> >  BTF_ID_FLAGS(func, scx_bpf_destroy_dsq)
> > +BTF_ID_FLAGS(func, bpf_iter_scx_dsq_new, KF_ITER_NEW | KF_RCU_PROTECTED)
> > +BTF_ID_FLAGS(func, bpf_iter_scx_dsq_next, KF_ITER_NEXT | KF_RET_NULL)
> > +BTF_ID_FLAGS(func, bpf_iter_scx_dsq_destroy, KF_ITER_DESTROY)
> >  BTF_ID_FLAGS(func, scx_bpf_exit_bstr, KF_TRUSTED_ARGS)
> >  BTF_ID_FLAGS(func, scx_bpf_error_bstr, KF_TRUSTED_ARGS)
> >  BTF_ID_FLAGS(func, scx_bpf_dump_bstr, KF_TRUSTED_ARGS)
> > --- a/tools/sched_ext/include/scx/common.bpf.h
> > +++ b/tools/sched_ext/include/scx/common.bpf.h
> > @@ -39,6 +39,9 @@ u32 scx_bpf_reenqueue_local(void) __ksym
> >  void scx_bpf_kick_cpu(s32 cpu, u64 flags) __ksym;
> >  s32 scx_bpf_dsq_nr_queued(u64 dsq_id) __ksym;
> >  void scx_bpf_destroy_dsq(u64 dsq_id) __ksym;
> > +int bpf_iter_scx_dsq_new(struct bpf_iter_scx_dsq *it, u64 dsq_id, u64 flags) __ksym __weak;
> > +struct task_struct *bpf_iter_scx_dsq_next(struct bpf_iter_scx_dsq *it) __ksym __weak;
> > +void bpf_iter_scx_dsq_destroy(struct bpf_iter_scx_dsq *it) __ksym __weak;
> >  void scx_bpf_exit_bstr(s64 exit_code, char *fmt, unsigned long long *data, u32 data__sz) __ksym __weak;
> >  void scx_bpf_error_bstr(char *fmt, unsigned long long *data, u32 data_len) __ksym;
> >  void scx_bpf_dump_bstr(char *fmt, unsigned long long *data, u32 data_len) __ksym __weak;
> > --- a/tools/sched_ext/scx_qmap.bpf.c
> > +++ b/tools/sched_ext/scx_qmap.bpf.c
> 
> We typically split kernel changes vs bpf prog and selftests changes
> into separate patches.

Let me think about that. I kinda like putting them into the same patch as
long as they're small as it makes the patch more self-contained but yeah
separating out does have its benefits (e.g. for backporting).

> > +"  -P            Print out DSQ content to trace_pipe every second, use with -b\n"
> 
> tbh the demo of the iterator is so-so. Could have done something more
> interesting :)

Yeah, it's difficult to do something actually interesting with scx_qmap.
Once the scx_bpf_consume_task() part lands, the example can become more
interesting. scx_lavd is already using the iterator. Its usage is a lot more
interesting and actually useful (note that the syntax is a bit different
right now, will be synced soon):

  https://github.com/sched-ext/scx/blob/main/scheds/rust/scx_lavd/src/bpf/main.bpf.c#L2041

Thanks.
Alexei Starovoitov July 9, 2024, 1:50 a.m. UTC | #3
On Mon, Jul 8, 2024 at 4:40 PM Tejun Heo <tj@kernel.org> wrote:
>
>
> > > @@ -1415,7 +1487,7 @@ static void dispatch_enqueue(struct scx_
> > >                  * tested easily when adding the first task.
> > >                  */
> > >                 if (unlikely(RB_EMPTY_ROOT(&dsq->priq) &&
> > > -                            !list_empty(&dsq->list)))
> > > +                            nldsq_next_task(dsq, NULL, false)))
> >
> > There is also consume_dispatch_q() that is doing
> > list_empty(&dsq->list) check.
> > Does it need to be updated as well?
>
> The one in consume_dispatch_q() is an opportunistic unlocked test as by the
> time consume_dispatch_q() is called list head update should be visible
> without locking. The test should fail if there's anythingn on the list and
> then the code locks the dsq and does proper nldsq_for_each_task(). So, yeah,
> that should be a naked list_empty() test. I'll add a comment explaining
> what's going on there.

I see. Thanks for adding a comment.

> > > --- a/tools/sched_ext/scx_qmap.bpf.c
> > > +++ b/tools/sched_ext/scx_qmap.bpf.c
> >
> > We typically split kernel changes vs bpf prog and selftests changes
> > into separate patches.
>
> Let me think about that. I kinda like putting them into the same patch as
> long as they're small as it makes the patch more self-contained but yeah
> separating out does have its benefits (e.g. for backporting).

We split kernel vs libbpf vs selftest patches, because libbpf patches
get synced into github and it's released from there, while
kernel patches get backported, and selftests don't have to be backported.

> > > +"  -P            Print out DSQ content to trace_pipe every second, use with -b\n"
> >
> > tbh the demo of the iterator is so-so. Could have done something more
> > interesting :)
>
> Yeah, it's difficult to do something actually interesting with scx_qmap.
> Once the scx_bpf_consume_task() part lands, the example can become more
> interesting. scx_lavd is already using the iterator. Its usage is a lot more
> interesting and actually useful (note that the syntax is a bit different
> right now, will be synced soon):
>
>   https://github.com/sched-ext/scx/blob/main/scheds/rust/scx_lavd/src/bpf/main.bpf.c#L2041

Thanks for the extra context.
diff mbox series

Patch

--- a/include/linux/sched/ext.h
+++ b/include/linux/sched/ext.h
@@ -61,6 +61,7 @@  struct scx_dispatch_q {
 	struct list_head	list;	/* tasks in dispatch order */
 	struct rb_root		priq;	/* used to order by p->scx.dsq_vtime */
 	u32			nr;
+	u32			seq;	/* used by BPF iter */
 	u64			id;
 	struct rhash_head	hash_node;
 	struct llist_node	free_node;
@@ -123,6 +124,7 @@  enum scx_kf_mask {
 
 struct scx_dsq_list_node {
 	struct list_head	node;
+	bool			is_bpf_iter_cursor;
 };
 
 /*
@@ -133,6 +135,7 @@  struct sched_ext_entity {
 	struct scx_dispatch_q	*dsq;
 	struct scx_dsq_list_node dsq_list;	/* dispatch order */
 	struct rb_node		dsq_priq;	/* p->scx.dsq_vtime order */
+	u32			dsq_seq;
 	u32			dsq_flags;	/* protected by DSQ lock */
 	u32			flags;		/* protected by rq lock */
 	u32			weight;
--- a/kernel/sched/ext.c
+++ b/kernel/sched/ext.c
@@ -926,6 +926,11 @@  static u32 highest_bit(u32 flags)
 	return ((u64)1 << bit) >> 1;
 }
 
+static bool u32_before(u32 a, u32 b)
+{
+	return (s32)(a - b) < 0;
+}
+
 /*
  * scx_kf_mask enforcement. Some kfuncs can only be called from specific SCX
  * ops. When invoking SCX ops, SCX_CALL_OP[_RET]() should be used to indicate
@@ -1066,6 +1071,73 @@  static __always_inline bool scx_kf_allow
 	return true;
 }
 
+/**
+ * nldsq_next_task - Iterate to the next task in a non-local DSQ
+ * @dsq: user dsq being interated
+ * @cur: current position, %NULL to start iteration
+ * @rev: walk backwards
+ *
+ * Returns %NULL when iteration is finished.
+ */
+static struct task_struct *nldsq_next_task(struct scx_dispatch_q *dsq,
+					   struct task_struct *cur, bool rev)
+{
+	struct list_head *list_node;
+	struct scx_dsq_list_node *dsq_lnode;
+
+	lockdep_assert_held(&dsq->lock);
+
+	if (cur)
+		list_node = &cur->scx.dsq_list.node;
+	else
+		list_node = &dsq->list;
+
+	/* find the next task, need to skip BPF iteration cursors */
+	do {
+		if (rev)
+			list_node = list_node->prev;
+		else
+			list_node = list_node->next;
+
+		if (list_node == &dsq->list)
+			return NULL;
+
+		dsq_lnode = container_of(list_node, struct scx_dsq_list_node,
+					 node);
+	} while (dsq_lnode->is_bpf_iter_cursor);
+
+	return container_of(dsq_lnode, struct task_struct, scx.dsq_list);
+}
+
+#define nldsq_for_each_task(p, dsq)						\
+	for ((p) = nldsq_next_task((dsq), NULL, false); (p);			\
+	     (p) = nldsq_next_task((dsq), (p), false))
+
+
+/*
+ * BPF DSQ iterator. Tasks in a non-local DSQ can be iterated in [reverse]
+ * dispatch order. BPF-visible iterator is opaque and larger to allow future
+ * changes without breaking backward compatibility. Can be used with
+ * bpf_for_each(). See bpf_iter_scx_dsq_*().
+ */
+enum scx_dsq_iter_flags {
+	/* iterate in the reverse dispatch order */
+	SCX_DSQ_ITER_REV		= 1U << 0,
+
+	__SCX_DSQ_ITER_ALL_FLAGS	= SCX_DSQ_ITER_REV,
+};
+
+struct bpf_iter_scx_dsq_kern {
+	struct scx_dsq_list_node	cursor;
+	struct scx_dispatch_q		*dsq;
+	u32				dsq_seq;
+	u32				flags;
+} __attribute__((aligned(8)));
+
+struct bpf_iter_scx_dsq {
+	u64				__opaque[6];
+} __attribute__((aligned(8)));
+
 
 /*
  * SCX task iterator.
@@ -1415,7 +1487,7 @@  static void dispatch_enqueue(struct scx_
 		 * tested easily when adding the first task.
 		 */
 		if (unlikely(RB_EMPTY_ROOT(&dsq->priq) &&
-			     !list_empty(&dsq->list)))
+			     nldsq_next_task(dsq, NULL, false)))
 			scx_ops_error("DSQ ID 0x%016llx already had FIFO-enqueued tasks",
 				      dsq->id);
 
@@ -1447,6 +1519,10 @@  static void dispatch_enqueue(struct scx_
 			list_add_tail(&p->scx.dsq_list.node, &dsq->list);
 	}
 
+	/* seq records the order tasks are queued, used by BPF DSQ iterator */
+	dsq->seq++;
+	p->scx.dsq_seq = dsq->seq;
+
 	dsq_mod_nr(dsq, 1);
 	p->scx.dsq = dsq;
 
@@ -2109,7 +2185,7 @@  retry:
 
 	raw_spin_lock(&dsq->lock);
 
-	list_for_each_entry(p, &dsq->list, scx.dsq_list.node) {
+	nldsq_for_each_task(p, dsq) {
 		struct rq *task_rq = task_rq(p);
 
 		if (rq == task_rq) {
@@ -5697,6 +5773,110 @@  __bpf_kfunc void scx_bpf_destroy_dsq(u64
 	destroy_dsq(dsq_id);
 }
 
+/**
+ * bpf_iter_scx_dsq_new - Create a DSQ iterator
+ * @it: iterator to initialize
+ * @dsq_id: DSQ to iterate
+ * @flags: %SCX_DSQ_ITER_*
+ *
+ * Initialize BPF iterator @it which can be used with bpf_for_each() to walk
+ * tasks in the DSQ specified by @dsq_id. Iteration using @it only includes
+ * tasks which are already queued when this function is invoked.
+ */
+__bpf_kfunc int bpf_iter_scx_dsq_new(struct bpf_iter_scx_dsq *it, u64 dsq_id,
+				     u64 flags)
+{
+	struct bpf_iter_scx_dsq_kern *kit = (void *)it;
+
+	BUILD_BUG_ON(sizeof(struct bpf_iter_scx_dsq_kern) >
+		     sizeof(struct bpf_iter_scx_dsq));
+	BUILD_BUG_ON(__alignof__(struct bpf_iter_scx_dsq_kern) !=
+		     __alignof__(struct bpf_iter_scx_dsq));
+
+	if (flags & ~__SCX_DSQ_ITER_ALL_FLAGS)
+		return -EINVAL;
+
+	kit->dsq = find_non_local_dsq(dsq_id);
+	if (!kit->dsq)
+		return -ENOENT;
+
+	INIT_LIST_HEAD(&kit->cursor.node);
+	kit->cursor.is_bpf_iter_cursor = true;
+	kit->dsq_seq = READ_ONCE(kit->dsq->seq);
+	kit->flags = flags;
+
+	return 0;
+}
+
+/**
+ * bpf_iter_scx_dsq_next - Progress a DSQ iterator
+ * @it: iterator to progress
+ *
+ * Return the next task. See bpf_iter_scx_dsq_new().
+ */
+__bpf_kfunc struct task_struct *bpf_iter_scx_dsq_next(struct bpf_iter_scx_dsq *it)
+{
+	struct bpf_iter_scx_dsq_kern *kit = (void *)it;
+	bool rev = kit->flags & SCX_DSQ_ITER_REV;
+	struct task_struct *p;
+	unsigned long flags;
+
+	if (!kit->dsq)
+		return NULL;
+
+	raw_spin_lock_irqsave(&kit->dsq->lock, flags);
+
+	if (list_empty(&kit->cursor.node))
+		p = NULL;
+	else
+		p = container_of(&kit->cursor, struct task_struct, scx.dsq_list);
+
+	/*
+	 * Only tasks which were queued before the iteration started are
+	 * visible. This bounds BPF iterations and guarantees that vtime never
+	 * jumps in the other direction while iterating.
+	 */
+	do {
+		p = nldsq_next_task(kit->dsq, p, rev);
+	} while (p && unlikely(u32_before(kit->dsq_seq, p->scx.dsq_seq)));
+
+	if (p) {
+		if (rev)
+			list_move_tail(&kit->cursor.node, &p->scx.dsq_list.node);
+		else
+			list_move(&kit->cursor.node, &p->scx.dsq_list.node);
+	} else {
+		list_del_init(&kit->cursor.node);
+	}
+
+	raw_spin_unlock_irqrestore(&kit->dsq->lock, flags);
+
+	return p;
+}
+
+/**
+ * bpf_iter_scx_dsq_destroy - Destroy a DSQ iterator
+ * @it: iterator to destroy
+ *
+ * Undo scx_iter_scx_dsq_new().
+ */
+__bpf_kfunc void bpf_iter_scx_dsq_destroy(struct bpf_iter_scx_dsq *it)
+{
+	struct bpf_iter_scx_dsq_kern *kit = (void *)it;
+
+	if (!kit->dsq)
+		return;
+
+	if (!list_empty(&kit->cursor.node)) {
+		unsigned long flags;
+
+		raw_spin_lock_irqsave(&kit->dsq->lock, flags);
+		list_del_init(&kit->cursor.node);
+		raw_spin_unlock_irqrestore(&kit->dsq->lock, flags);
+	}
+	kit->dsq = NULL;
+}
+
 __bpf_kfunc_end_defs();
 
 static s32 __bstr_format(u64 *data_buf, char *line_buf, size_t line_size,
@@ -6118,6 +6298,9 @@  BTF_KFUNCS_START(scx_kfunc_ids_any)
 BTF_ID_FLAGS(func, scx_bpf_kick_cpu)
 BTF_ID_FLAGS(func, scx_bpf_dsq_nr_queued)
 BTF_ID_FLAGS(func, scx_bpf_destroy_dsq)
+BTF_ID_FLAGS(func, bpf_iter_scx_dsq_new, KF_ITER_NEW | KF_RCU_PROTECTED)
+BTF_ID_FLAGS(func, bpf_iter_scx_dsq_next, KF_ITER_NEXT | KF_RET_NULL)
+BTF_ID_FLAGS(func, bpf_iter_scx_dsq_destroy, KF_ITER_DESTROY)
 BTF_ID_FLAGS(func, scx_bpf_exit_bstr, KF_TRUSTED_ARGS)
 BTF_ID_FLAGS(func, scx_bpf_error_bstr, KF_TRUSTED_ARGS)
 BTF_ID_FLAGS(func, scx_bpf_dump_bstr, KF_TRUSTED_ARGS)
--- a/tools/sched_ext/include/scx/common.bpf.h
+++ b/tools/sched_ext/include/scx/common.bpf.h
@@ -39,6 +39,9 @@  u32 scx_bpf_reenqueue_local(void) __ksym
 void scx_bpf_kick_cpu(s32 cpu, u64 flags) __ksym;
 s32 scx_bpf_dsq_nr_queued(u64 dsq_id) __ksym;
 void scx_bpf_destroy_dsq(u64 dsq_id) __ksym;
+int bpf_iter_scx_dsq_new(struct bpf_iter_scx_dsq *it, u64 dsq_id, u64 flags) __ksym __weak;
+struct task_struct *bpf_iter_scx_dsq_next(struct bpf_iter_scx_dsq *it) __ksym __weak;
+void bpf_iter_scx_dsq_destroy(struct bpf_iter_scx_dsq *it) __ksym __weak;
 void scx_bpf_exit_bstr(s64 exit_code, char *fmt, unsigned long long *data, u32 data__sz) __ksym __weak;
 void scx_bpf_error_bstr(char *fmt, unsigned long long *data, u32 data_len) __ksym;
 void scx_bpf_dump_bstr(char *fmt, unsigned long long *data, u32 data_len) __ksym __weak;
--- a/tools/sched_ext/scx_qmap.bpf.c
+++ b/tools/sched_ext/scx_qmap.bpf.c
@@ -36,6 +36,7 @@  const volatile u32 stall_user_nth;
 const volatile u32 stall_kernel_nth;
 const volatile u32 dsp_inf_loop_after;
 const volatile u32 dsp_batch;
+const volatile bool print_shared_dsq;
 const volatile s32 disallow_tgid;
 const volatile bool suppress_dump;
 
@@ -604,10 +605,34 @@  out:
 	scx_bpf_put_cpumask(online);
 }
 
+/*
+ * Dump the currently queued tasks in the shared DSQ to demonstrate the usage of
+ * scx_bpf_dsq_nr_queued() and DSQ iterator. Raise the dispatch batch count to
+ * see meaningful dumps in the trace pipe.
+ */
+static void dump_shared_dsq(void)
+{
+	struct task_struct *p;
+	s32 nr;
+
+	if (!(nr = scx_bpf_dsq_nr_queued(SHARED_DSQ)))
+		return;
+
+	bpf_printk("Dumping %d tasks in SHARED_DSQ in reverse order", nr);
+
+	bpf_rcu_read_lock();
+	bpf_for_each(scx_dsq, p, SHARED_DSQ, SCX_DSQ_ITER_REV)
+		bpf_printk("%s[%d]", p->comm, p->pid);
+	bpf_rcu_read_unlock();
+}
+
 static int monitor_timerfn(void *map, int *key, struct bpf_timer *timer)
 {
 	monitor_cpuperf();
 
+	if (print_shared_dsq)
+		dump_shared_dsq();
+
 	bpf_timer_start(timer, ONE_SEC_IN_NS, 0);
 	return 0;
 }
--- a/tools/sched_ext/scx_qmap.c
+++ b/tools/sched_ext/scx_qmap.c
@@ -20,7 +20,7 @@  const char help_fmt[] =
 "See the top-level comment in .bpf.c for more details.\n"
 "\n"
 "Usage: %s [-s SLICE_US] [-e COUNT] [-t COUNT] [-T COUNT] [-l COUNT] [-b COUNT]\n"
-"       [-d PID] [-D LEN] [-p] [-v]\n"
+"       [-P] [-d PID] [-D LEN] [-p] [-v]\n"
 "\n"
 "  -s SLICE_US   Override slice duration\n"
 "  -e COUNT      Trigger scx_bpf_error() after COUNT enqueues\n"
@@ -28,6 +28,7 @@  const char help_fmt[] =
 "  -T COUNT      Stall every COUNT'th kernel thread\n"
 "  -l COUNT      Trigger dispatch infinite looping after COUNT dispatches\n"
 "  -b COUNT      Dispatch upto COUNT tasks together\n"
+"  -P            Print out DSQ content to trace_pipe every second, use with -b\n"
 "  -d PID        Disallow a process from switching into SCHED_EXT (-1 for self)\n"
 "  -D LEN        Set scx_exit_info.dump buffer length\n"
 "  -S            Suppress qmap-specific debug dump\n"
@@ -62,7 +63,7 @@  int main(int argc, char **argv)
 
 	skel = SCX_OPS_OPEN(qmap_ops, scx_qmap);
 
-	while ((opt = getopt(argc, argv, "s:e:t:T:l:b:d:D:Spvh")) != -1) {
+	while ((opt = getopt(argc, argv, "s:e:t:T:l:b:Pd:D:Spvh")) != -1) {
 		switch (opt) {
 		case 's':
 			skel->rodata->slice_ns = strtoull(optarg, NULL, 0) * 1000;
@@ -82,6 +83,9 @@  int main(int argc, char **argv)
 		case 'b':
 			skel->rodata->dsp_batch = strtoul(optarg, NULL, 0);
 			break;
+		case 'P':
+			skel->rodata->print_shared_dsq = true;
+			break;
 		case 'd':
 			skel->rodata->disallow_tgid = strtol(optarg, NULL, 0);
 			if (skel->rodata->disallow_tgid < 0)