diff mbox series

[3/5] bpf: Add bpf_user_ringbuf_drain() helper

Message ID 20220808155341.2479054-3-void@manifault.com (mailing list archive)
State Superseded
Delegated to: BPF
Headers show
Series bpf: Add user-space-publisher ringbuffer map type | expand

Checks

Context Check Description
netdev/tree_selection success Guessed tree name to be net-next, async
netdev/fixes_present success Fixes tag not required for -next series
netdev/subject_prefix success Link
netdev/cover_letter success Series has a cover letter
netdev/patch_count success Link
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit fail Errors and warnings before: 1723 this patch: 1727
netdev/cc_maintainers success CCed 12 of 12 maintainers
netdev/build_clang success Errors and warnings before: 177 this patch: 177
netdev/module_param success Was 0 now: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 1717 this patch: 1717
netdev/checkpatch warning CHECK: Alignment should match open parenthesis CHECK: Blank lines aren't necessary after an open brace '{' CHECK: Please don't use multiple blank lines WARNING: line length of 81 exceeds 80 columns WARNING: line length of 85 exceeds 80 columns WARNING: line length of 92 exceeds 80 columns
netdev/kdoc success Errors and warnings before: 0 this patch: 0
netdev/source_inline success Was 0 now: 0
bpf/vmtest-bpf-next-PR fail PR summary
bpf/vmtest-bpf-next-VM_Test-1 fail Logs for Kernel LATEST on ubuntu-latest with gcc
bpf/vmtest-bpf-next-VM_Test-2 fail Logs for Kernel LATEST on ubuntu-latest with llvm-16
bpf/vmtest-bpf-next-VM_Test-3 fail Logs for Kernel LATEST on z15 with gcc

Commit Message

David Vernet Aug. 8, 2022, 3:53 p.m. UTC
Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
helper function that allows BPF programs to drain samples from the ring
buffer, and invoke a callback for each. This patch adds a new
bpf_user_ringbuf_drain() helper that provides this abstraction.

In order to support this, we needed to also add a new PTR_TO_DYNPTR
register type to reflect a dynptr that was allocated by a helper function
and passed to a BPF program. The verifier currently only supports
PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.

Signed-off-by: David Vernet <void@manifault.com>
---
 include/linux/bpf.h            |   6 +-
 include/uapi/linux/bpf.h       |   8 ++
 kernel/bpf/helpers.c           |   2 +
 kernel/bpf/ringbuf.c           | 157 +++++++++++++++++++++++++++++++--
 kernel/bpf/verifier.c          |  57 +++++++++++-
 tools/include/uapi/linux/bpf.h |   8 ++
 6 files changed, 228 insertions(+), 10 deletions(-)

Comments

kernel test robot Aug. 8, 2022, 9:55 p.m. UTC | #1
Hi David,

Thank you for the patch! Perhaps something to improve:

[auto build test WARNING on bpf-next/master]
[also build test WARNING on bpf/master linus/master next-20220808]
[cannot apply to v5.19]
[If your patch is applied to the wrong git tree, kindly drop us a note.
And when submitting patch, we suggest to use '--base' as documented in
https://git-scm.com/docs/git-format-patch#_base_tree_information]

url:    https://github.com/intel-lab-lkp/linux/commits/David-Vernet/bpf-Clear-callee-saved-regs-after-updating-REG0/20220808-235558
base:   https://git.kernel.org/pub/scm/linux/kernel/git/bpf/bpf-next.git master
config: arc-allyesconfig (https://download.01.org/0day-ci/archive/20220809/202208090505.me3WqXOM-lkp@intel.com/config)
compiler: arceb-elf-gcc (GCC) 12.1.0
reproduce (this is a W=1 build):
        wget https://raw.githubusercontent.com/intel/lkp-tests/master/sbin/make.cross -O ~/bin/make.cross
        chmod +x ~/bin/make.cross
        # https://github.com/intel-lab-lkp/linux/commit/70db51b231aeddaa6eecd19afeeebef610ae2686
        git remote add linux-review https://github.com/intel-lab-lkp/linux
        git fetch --no-tags linux-review David-Vernet/bpf-Clear-callee-saved-regs-after-updating-REG0/20220808-235558
        git checkout 70db51b231aeddaa6eecd19afeeebef610ae2686
        # save the config file
        mkdir build_dir && cp config build_dir/.config
        COMPILER_INSTALL_PATH=$HOME/0day COMPILER=gcc-12.1.0 make.cross W=1 O=build_dir ARCH=arc SHELL=/bin/bash kernel/bpf/

If you fix the issue, kindly add following tag where applicable
Reported-by: kernel test robot <lkp@intel.com>

All warnings (new ones prefixed by >>):

   kernel/bpf/ringbuf.c: In function '__bpf_user_ringbuf_poll':
>> kernel/bpf/ringbuf.c:653:15: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
     653 |         hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
         |               ^
   kernel/bpf/ringbuf.c:682:19: warning: cast to pointer from integer of different size [-Wint-to-pointer-cast]
     682 |         *sample = (void *)((uintptr_t)rb->data +
         |                   ^
   kernel/bpf/ringbuf.c: In function '____bpf_user_ringbuf_drain':
>> kernel/bpf/ringbuf.c:736:40: warning: cast from pointer to integer of different size [-Wpointer-to-int-cast]
     736 |                         ret = callback((u64)&dynptr,
         |                                        ^


vim +653 kernel/bpf/ringbuf.c

   626	
   627	static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
   628					   u32 *size)
   629	{
   630		unsigned long cons_pos, prod_pos;
   631		u32 sample_len, total_len;
   632		u32 *hdr;
   633		int err;
   634		int busy = 0;
   635	
   636		/* If another consumer is already consuming a sample, wait for them to
   637		 * finish.
   638		 */
   639		if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
   640			return -EBUSY;
   641	
   642		/* Synchronizes with smp_store_release() in user-space. */
   643		prod_pos = smp_load_acquire(&rb->producer_pos);
   644		/* Synchronizes with smp_store_release() in
   645		 * __bpf_user_ringbuf_sample_release().
   646		 */
   647		cons_pos = smp_load_acquire(&rb->consumer_pos);
   648		if (cons_pos >= prod_pos) {
   649			atomic_set(&rb->busy, 0);
   650			return -ENODATA;
   651		}
   652	
 > 653		hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
   654		sample_len = *hdr;
   655	
   656		/* Check that the sample can fit into a dynptr. */
   657		err = bpf_dynptr_check_size(sample_len);
   658		if (err) {
   659			atomic_set(&rb->busy, 0);
   660			return err;
   661		}
   662	
   663		/* Check that the sample fits within the region advertised by the
   664		 * consumer position.
   665		 */
   666		total_len = sample_len + BPF_RINGBUF_HDR_SZ;
   667		if (total_len > prod_pos - cons_pos) {
   668			atomic_set(&rb->busy, 0);
   669			return -E2BIG;
   670		}
   671	
   672		/* Check that the sample fits within the data region of the ring buffer.
   673		 */
   674		if (total_len > rb->mask + 1) {
   675			atomic_set(&rb->busy, 0);
   676			return -E2BIG;
   677		}
   678	
   679		/* consumer_pos is updated when the sample is released.
   680		 */
   681	
   682		*sample = (void *)((uintptr_t)rb->data +
   683				   ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
   684		*size = sample_len;
   685	
   686		return 0;
   687	}
   688	
   689	static void
   690	__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
   691					  u64 flags)
   692	{
   693	
   694	
   695		/* To release the ringbuffer, just increment the producer position to
   696		 * signal that a new sample can be consumed. The busy bit is cleared by
   697		 * userspace when posting a new sample to the ringbuffer.
   698		 */
   699		smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
   700				  BPF_RINGBUF_HDR_SZ);
   701	
   702		if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
   703			irq_work_queue(&rb->work);
   704	
   705		atomic_set(&rb->busy, 0);
   706	}
   707	
   708	BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
   709		   void *, callback_fn, void *, callback_ctx, u64, flags)
   710	{
   711		struct bpf_ringbuf *rb;
   712		long num_samples = 0, ret = 0;
   713		int err;
   714		bpf_callback_t callback = (bpf_callback_t)callback_fn;
   715		u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
   716	
   717		if (unlikely(flags & ~wakeup_flags))
   718			return -EINVAL;
   719	
   720		/* The two wakeup flags are mutually exclusive. */
   721		if (unlikely((flags & wakeup_flags) == wakeup_flags))
   722			return -EINVAL;
   723	
   724		rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
   725		do {
   726			u32 size;
   727			void *sample;
   728	
   729			err = __bpf_user_ringbuf_poll(rb, &sample, &size);
   730	
   731			if (!err) {
   732				struct bpf_dynptr_kern dynptr;
   733	
   734				bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
   735						0, size);
 > 736				ret = callback((u64)&dynptr,
   737						(u64)(uintptr_t)callback_ctx, 0, 0, 0);
   738	
   739				__bpf_user_ringbuf_sample_release(rb, size, flags);
   740				num_samples++;
   741			}
   742		} while (err == 0 && num_samples < 4096 && ret == 0);
   743	
   744		return num_samples;
   745	}
   746
Andrii Nakryiko Aug. 11, 2022, 11:22 p.m. UTC | #2
On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
>
> Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> helper function that allows BPF programs to drain samples from the ring
> buffer, and invoke a callback for each. This patch adds a new
> bpf_user_ringbuf_drain() helper that provides this abstraction.
>
> In order to support this, we needed to also add a new PTR_TO_DYNPTR
> register type to reflect a dynptr that was allocated by a helper function
> and passed to a BPF program. The verifier currently only supports
> PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.

This commit message is a bit too laconic. There is a lot of
implications of various parts of this patch, it would be great to
highlight most important ones. Please consider elaborating a bit more.

>
> Signed-off-by: David Vernet <void@manifault.com>
> ---
>  include/linux/bpf.h            |   6 +-
>  include/uapi/linux/bpf.h       |   8 ++
>  kernel/bpf/helpers.c           |   2 +
>  kernel/bpf/ringbuf.c           | 157 +++++++++++++++++++++++++++++++--
>  kernel/bpf/verifier.c          |  57 +++++++++++-
>  tools/include/uapi/linux/bpf.h |   8 ++
>  6 files changed, 228 insertions(+), 10 deletions(-)
>
> diff --git a/include/linux/bpf.h b/include/linux/bpf.h
> index 20c26aed7896..4d0d5f2a3ac8 100644
> --- a/include/linux/bpf.h
> +++ b/include/linux/bpf.h
> @@ -401,7 +401,7 @@ enum bpf_type_flag {
>         /* DYNPTR points to memory local to the bpf program. */
>         DYNPTR_TYPE_LOCAL       = BIT(8 + BPF_BASE_TYPE_BITS),
>
> -       /* DYNPTR points to a ringbuf record. */
> +       /* DYNPTR points to a kernel-produced ringbuf record. */
>         DYNPTR_TYPE_RINGBUF     = BIT(9 + BPF_BASE_TYPE_BITS),
>
>         /* Size is known at compile time. */
> @@ -606,6 +606,7 @@ enum bpf_reg_type {
>         PTR_TO_MEM,              /* reg points to valid memory region */
>         PTR_TO_BUF,              /* reg points to a read/write buffer */
>         PTR_TO_FUNC,             /* reg points to a bpf program function */
> +       PTR_TO_DYNPTR,           /* reg points to a dynptr */
>         __BPF_REG_TYPE_MAX,
>
>         /* Extended reg_types. */
> @@ -2410,6 +2411,7 @@ extern const struct bpf_func_proto bpf_loop_proto;
>  extern const struct bpf_func_proto bpf_copy_from_user_task_proto;
>  extern const struct bpf_func_proto bpf_set_retval_proto;
>  extern const struct bpf_func_proto bpf_get_retval_proto;
> +extern const struct bpf_func_proto bpf_user_ringbuf_drain_proto;
>
>  const struct bpf_func_proto *tracing_prog_func_proto(
>    enum bpf_func_id func_id, const struct bpf_prog *prog);
> @@ -2554,7 +2556,7 @@ enum bpf_dynptr_type {
>         BPF_DYNPTR_TYPE_INVALID,
>         /* Points to memory that is local to the bpf program */
>         BPF_DYNPTR_TYPE_LOCAL,
> -       /* Underlying data is a ringbuf record */
> +       /* Underlying data is a kernel-produced ringbuf record */
>         BPF_DYNPTR_TYPE_RINGBUF,
>  };
>
> diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> index a341f877b230..ca125648d7fd 100644
> --- a/include/uapi/linux/bpf.h
> +++ b/include/uapi/linux/bpf.h
> @@ -5332,6 +5332,13 @@ union bpf_attr {
>   *             **-EACCES** if the SYN cookie is not valid.
>   *
>   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> + *
> + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> + *     Description
> + *             Drain samples from the specified user ringbuffer, and invoke the
> + *             provided callback for each such sample.

please specify what's the expected signature of callback_fn

> + *     Return
> + *             An error if a sample could not be drained.

Negative error, right? And might be worth it briefly describing what
are the situation(s) when you won't be able to drain a sample?

Also please clarify if having no sample to drain is an error or not?

It's also important to specify that if no error (and -ENODATA
shouldn't be an error, actually) occurred then we get number of
consumed samples back.

>   */
>  #define __BPF_FUNC_MAPPER(FN)          \
>         FN(unspec),                     \
> @@ -5542,6 +5549,7 @@ union bpf_attr {
>         FN(tcp_raw_gen_syncookie_ipv6), \
>         FN(tcp_raw_check_syncookie_ipv4),       \
>         FN(tcp_raw_check_syncookie_ipv6),       \
> +       FN(user_ringbuf_drain),         \
>         /* */
>
>  /* integer value in 'imm' field of BPF_CALL instruction selects which helper
> diff --git a/kernel/bpf/helpers.c b/kernel/bpf/helpers.c
> index 1f961f9982d2..b8097876014c 100644
> --- a/kernel/bpf/helpers.c
> +++ b/kernel/bpf/helpers.c
> @@ -1647,6 +1647,8 @@ bpf_base_func_proto(enum bpf_func_id func_id)
>                 return &bpf_dynptr_write_proto;
>         case BPF_FUNC_dynptr_data:
>                 return &bpf_dynptr_data_proto;
> +       case BPF_FUNC_user_ringbuf_drain:
> +               return &bpf_user_ringbuf_drain_proto;
>         default:
>                 break;
>         }
> diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> index 29e2de42df15..fc589fc8eb7c 100644
> --- a/kernel/bpf/ringbuf.c
> +++ b/kernel/bpf/ringbuf.c
> @@ -291,16 +291,33 @@ static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
>  }
>
>  static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp,
> -                                struct poll_table_struct *pts)
> +                                struct poll_table_struct *pts,
> +                                bool kernel_producer)
>  {
>         struct bpf_ringbuf_map *rb_map;
> +       bool buffer_empty;
>
>         rb_map = container_of(map, struct bpf_ringbuf_map, map);
>         poll_wait(filp, &rb_map->rb->waitq, pts);
>
> -       if (ringbuf_avail_data_sz(rb_map->rb))
> -               return EPOLLIN | EPOLLRDNORM;
> -       return 0;
> +       buffer_empty = !ringbuf_avail_data_sz(rb_map->rb);
> +
> +       if (kernel_producer)
> +               return buffer_empty ? 0 : EPOLLIN | EPOLLRDNORM;
> +       else
> +               return buffer_empty ? EPOLLOUT | EPOLLWRNORM : 0;
> +}
> +
> +static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
> +                                     struct poll_table_struct *pts)
> +{
> +       return ringbuf_map_poll(map, filp, pts, true);
> +}
> +
> +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
> +                                     struct poll_table_struct *pts)
> +{
> +       return ringbuf_map_poll(map, filp, pts, false);
>  }

This is an even stronger case where I think we should keep two
implementations completely separate. Please keep existing
ringbuf_map_poll as is and just add a user variant as a separate
implementation

>
>  BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
> @@ -309,7 +326,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
>         .map_alloc = ringbuf_map_alloc,
>         .map_free = ringbuf_map_free,
>         .map_mmap = ringbuf_map_mmap_kern,
> -       .map_poll = ringbuf_map_poll,
> +       .map_poll = ringbuf_map_poll_kern,
>         .map_lookup_elem = ringbuf_map_lookup_elem,
>         .map_update_elem = ringbuf_map_update_elem,
>         .map_delete_elem = ringbuf_map_delete_elem,
> @@ -323,6 +340,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = {
>         .map_alloc = ringbuf_map_alloc,
>         .map_free = ringbuf_map_free,
>         .map_mmap = ringbuf_map_mmap_user,
> +       .map_poll = ringbuf_map_poll_user,
>         .map_lookup_elem = ringbuf_map_lookup_elem,
>         .map_update_elem = ringbuf_map_update_elem,
>         .map_delete_elem = ringbuf_map_delete_elem,
> @@ -605,3 +623,132 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
>         .arg1_type      = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
>         .arg2_type      = ARG_ANYTHING,
>  };
> +
> +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> +                                  u32 *size)

"poll" part is quite confusing as it has nothing to do with epoll and
the other poll implementation. Maybe "peek"? It peek into next sample
without consuming it, seems appropriate

nit: this declaration can also stay on single line

> +{
> +       unsigned long cons_pos, prod_pos;
> +       u32 sample_len, total_len;
> +       u32 *hdr;
> +       int err;
> +       int busy = 0;

nit: combine matching types:

u32 sample_len, total_len, *hdr;
int err, busy = 0;

> +
> +       /* If another consumer is already consuming a sample, wait for them to
> +        * finish.
> +        */
> +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> +               return -EBUSY;
> +
> +       /* Synchronizes with smp_store_release() in user-space. */
> +       prod_pos = smp_load_acquire(&rb->producer_pos);

I think we should enforce that prod_pos is a multiple of 8

> +       /* Synchronizes with smp_store_release() in
> +        * __bpf_user_ringbuf_sample_release().
> +        */
> +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> +       if (cons_pos >= prod_pos) {
> +               atomic_set(&rb->busy, 0);
> +               return -ENODATA;
> +       }
> +
> +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> +       sample_len = *hdr;

do we need smp_load_acquire() here? libbpf's ring_buffer
implementation uses load_acquire here


> +
> +       /* Check that the sample can fit into a dynptr. */
> +       err = bpf_dynptr_check_size(sample_len);
> +       if (err) {
> +               atomic_set(&rb->busy, 0);
> +               return err;
> +       }
> +
> +       /* Check that the sample fits within the region advertised by the
> +        * consumer position.
> +        */
> +       total_len = sample_len + BPF_RINGBUF_HDR_SZ;

round up to closest multiple of 8? All the pointers into ringbuf data
area should be 8-byte aligned

> +       if (total_len > prod_pos - cons_pos) {
> +               atomic_set(&rb->busy, 0);
> +               return -E2BIG;
> +       }
> +
> +       /* Check that the sample fits within the data region of the ring buffer.
> +        */
> +       if (total_len > rb->mask + 1) {
> +               atomic_set(&rb->busy, 0);
> +               return -E2BIG;
> +       }
> +
> +       /* consumer_pos is updated when the sample is released.
> +        */
> +

nit: unnecessary empty line

and please keep single-line comments as single-line, no */ on separate
line in such case

> +       *sample = (void *)((uintptr_t)rb->data +
> +                          ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> +       *size = sample_len;
> +
> +       return 0;
> +}
> +
> +static void
> +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> +                                 u64 flags)

try to keep single lines if they are under 100 characters

> +{
> +
> +

empty lines, why?

> +       /* To release the ringbuffer, just increment the producer position to
> +        * signal that a new sample can be consumed. The busy bit is cleared by
> +        * userspace when posting a new sample to the ringbuffer.
> +        */
> +       smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> +                         BPF_RINGBUF_HDR_SZ);
> +
> +       if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))

please use () around bit operator expressions

> +               irq_work_queue(&rb->work);
> +
> +       atomic_set(&rb->busy, 0);

set busy before scheduling irq_work? why delaying?

> +}
> +
> +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> +          void *, callback_fn, void *, callback_ctx, u64, flags)
> +{
> +       struct bpf_ringbuf *rb;
> +       long num_samples = 0, ret = 0;
> +       int err;
> +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> +       u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
> +
> +       if (unlikely(flags & ~wakeup_flags))
> +               return -EINVAL;
> +
> +       /* The two wakeup flags are mutually exclusive. */
> +       if (unlikely((flags & wakeup_flags) == wakeup_flags))
> +               return -EINVAL;

we don't check this for existing ringbuf, so maybe let's keep it
consistent? FORCE_WAKEUP is just stronger than NO_WAKEUP

> +
> +       rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
> +       do {
> +               u32 size;
> +               void *sample;
> +
> +               err = __bpf_user_ringbuf_poll(rb, &sample, &size);
> +

nit: don't keep empty line between function call and error check

> +               if (!err) {

so -ENODATA is a special error and you should stop if you get that.
But for any other error we should propagate error back, not just
silently consuming it

maybe

err = ...
if (err) {
    if (err == -ENODATA)
      break;
    return err;
}

?

> +                       struct bpf_dynptr_kern dynptr;
> +
> +                       bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
> +                                       0, size);

we should try to avoid unnecessary re-initialization of dynptr, let's
initialize it once and then just update data and size field inside the
loop?


> +                       ret = callback((u64)&dynptr,
> +                                       (u64)(uintptr_t)callback_ctx, 0, 0, 0);
> +
> +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> +                       num_samples++;
> +               }
> +       } while (err == 0 && num_samples < 4096 && ret == 0);
> +

4096 is pretty arbitrary. Definitely worth noting it somewhere and it
seems somewhat low, tbh...

ideally we'd cond_resched() from time to time, but that would require
BPF program to be sleepable, so we can't do that :(


> +       return num_samples;
> +}
> +
> +const struct bpf_func_proto bpf_user_ringbuf_drain_proto = {
> +       .func           = bpf_user_ringbuf_drain,
> +       .ret_type       = RET_INTEGER,
> +       .arg1_type      = ARG_CONST_MAP_PTR,
> +       .arg2_type      = ARG_PTR_TO_FUNC,
> +       .arg3_type      = ARG_PTR_TO_STACK_OR_NULL,
> +       .arg4_type      = ARG_ANYTHING,
> +};
> diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
> index 4a9562c62af0..99dfdc3ed187 100644
> --- a/kernel/bpf/verifier.c
> +++ b/kernel/bpf/verifier.c
> @@ -555,6 +555,7 @@ static const char *reg_type_str(struct bpf_verifier_env *env,
>                 [PTR_TO_BUF]            = "buf",
>                 [PTR_TO_FUNC]           = "func",
>                 [PTR_TO_MAP_KEY]        = "map_key",
> +               [PTR_TO_DYNPTR]         = "dynptr_ptr",
>         };
>
>         if (type & PTR_MAYBE_NULL) {
> @@ -5656,6 +5657,12 @@ static const struct bpf_reg_types stack_ptr_types = { .types = { PTR_TO_STACK }
>  static const struct bpf_reg_types const_str_ptr_types = { .types = { PTR_TO_MAP_VALUE } };
>  static const struct bpf_reg_types timer_types = { .types = { PTR_TO_MAP_VALUE } };
>  static const struct bpf_reg_types kptr_types = { .types = { PTR_TO_MAP_VALUE } };
> +static const struct bpf_reg_types dynptr_types = {
> +       .types = {
> +               PTR_TO_STACK,
> +               PTR_TO_DYNPTR | MEM_ALLOC | DYNPTR_TYPE_LOCAL,
> +       }
> +};
>
>  static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
>         [ARG_PTR_TO_MAP_KEY]            = &map_key_value_types,
> @@ -5682,7 +5689,7 @@ static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
>         [ARG_PTR_TO_CONST_STR]          = &const_str_ptr_types,
>         [ARG_PTR_TO_TIMER]              = &timer_types,
>         [ARG_PTR_TO_KPTR]               = &kptr_types,
> -       [ARG_PTR_TO_DYNPTR]             = &stack_ptr_types,
> +       [ARG_PTR_TO_DYNPTR]             = &dynptr_types,
>  };
>
>  static int check_reg_type(struct bpf_verifier_env *env, u32 regno,
> @@ -6025,6 +6032,13 @@ static int check_func_arg(struct bpf_verifier_env *env, u32 arg,
>                 err = check_mem_size_reg(env, reg, regno, true, meta);
>                 break;
>         case ARG_PTR_TO_DYNPTR:
> +               /* We only need to check for initialized / uninitialized helper
> +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> +                * is that if it is, that a helper function initialized the
> +                * dynptr on behalf of the BPF program.
> +                */
> +               if (reg->type & MEM_ALLOC)

Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier?
Normal dynptr created and used inside BPF program on the stack are
actually PTR_TO_STACK, so that should be enough distinction? Or am I
missing something?

> +                       break;
>                 if (arg_type & MEM_UNINIT) {
>                         if (!is_dynptr_reg_valid_uninit(env, reg)) {
>                                 verbose(env, "Dynptr has to be an uninitialized dynptr\n");
> @@ -6197,7 +6211,9 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
>                         goto error;
>                 break;
>         case BPF_MAP_TYPE_USER_RINGBUF:
> -               goto error;
> +               if (func_id != BPF_FUNC_user_ringbuf_drain)
> +                       goto error;
> +               break;
>         case BPF_MAP_TYPE_STACK_TRACE:
>                 if (func_id != BPF_FUNC_get_stackid)
>                         goto error;
> @@ -6317,6 +6333,10 @@ static int check_map_func_compatibility(struct bpf_verifier_env *env,
>                 if (map->map_type != BPF_MAP_TYPE_RINGBUF)
>                         goto error;
>                 break;
> +       case BPF_FUNC_user_ringbuf_drain:
> +               if (map->map_type != BPF_MAP_TYPE_USER_RINGBUF)
> +                       goto error;
> +               break;
>         case BPF_FUNC_get_stackid:
>                 if (map->map_type != BPF_MAP_TYPE_STACK_TRACE)
>                         goto error;
> @@ -6901,6 +6921,29 @@ static int set_find_vma_callback_state(struct bpf_verifier_env *env,
>         return 0;
>  }
>
> +static int set_user_ringbuf_callback_state(struct bpf_verifier_env *env,
> +                                          struct bpf_func_state *caller,
> +                                          struct bpf_func_state *callee,
> +                                          int insn_idx)
> +{
> +       /* bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void
> +        *                        callback_ctx, u64 flags);
> +        * callback_fn(struct bpf_dynptr_t* dynptr, void *callback_ctx);
> +        */
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_0]);
> +       callee->regs[BPF_REG_1].type = PTR_TO_DYNPTR | DYNPTR_TYPE_LOCAL | MEM_ALLOC;
> +       __mark_reg_known_zero(&callee->regs[BPF_REG_1]);
> +       callee->regs[BPF_REG_2] = caller->regs[BPF_REG_3];
> +
> +       /* unused */
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_3]);
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_4]);
> +       __mark_reg_not_init(env, &callee->regs[BPF_REG_5]);
> +
> +       callee->in_callback_fn = true;
> +       return 0;
> +}
> +
>  static int prepare_func_exit(struct bpf_verifier_env *env, int *insn_idx)
>  {
>         struct bpf_verifier_state *state = env->cur_state;
> @@ -7345,6 +7388,10 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
>                         }
>                 }
>                 break;
> +       case BPF_FUNC_user_ringbuf_drain:
> +               err = __check_func_call(env, insn, insn_idx_p, meta.subprogno,
> +                                       set_user_ringbuf_callback_state);
> +               break;
>         }
>
>         if (err)
> @@ -7477,11 +7524,15 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
>                 /* Find the id of the dynptr we're acquiring a reference to */
>                 for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
>                         if (arg_type_is_dynptr(fn->arg_type[i])) {
> +                               struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
> +
>                                 if (dynptr_id) {
>                                         verbose(env, "verifier internal error: multiple dynptr args in func\n");
>                                         return -EFAULT;
>                                 }
> -                               dynptr_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
> +
> +                               if (!(reg->type & MEM_ALLOC))
> +                                       dynptr_id = stack_slot_get_id(env, reg);

this part has changed in bpf-next

>                         }
>                 }
>                 /* For release_reference() */
> diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
> index ce0ce713faf9..e5e4ab12177c 100644
> --- a/tools/include/uapi/linux/bpf.h
> +++ b/tools/include/uapi/linux/bpf.h
> @@ -5332,6 +5332,13 @@ union bpf_attr {
>   *             **-EACCES** if the SYN cookie is not valid.
>   *
>   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> + *
> + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> + *     Description
> + *             Drain samples from the specified user ringbuffer, and invoke the
> + *             provided callback for each such sample.
> + *     Return
> + *             An error if a sample could not be drained.
>   */
>  #define __BPF_FUNC_MAPPER(FN)          \
>         FN(unspec),                     \
> @@ -5542,6 +5549,7 @@ union bpf_attr {
>         FN(tcp_raw_gen_syncookie_ipv6), \
>         FN(tcp_raw_check_syncookie_ipv4),       \
>         FN(tcp_raw_check_syncookie_ipv6),       \
> +       FN(bpf_user_ringbuf_drain),     \
>         /* */
>
>  /* integer value in 'imm' field of BPF_CALL instruction selects which helper
> --
> 2.30.2
>
David Vernet Aug. 12, 2022, 12:01 a.m. UTC | #3
On Thu, Aug 11, 2022 at 04:22:43PM -0700, Andrii Nakryiko wrote:
> On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
> >
> > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> > helper function that allows BPF programs to drain samples from the ring
> > buffer, and invoke a callback for each. This patch adds a new
> > bpf_user_ringbuf_drain() helper that provides this abstraction.
> >
> > In order to support this, we needed to also add a new PTR_TO_DYNPTR
> > register type to reflect a dynptr that was allocated by a helper function
> > and passed to a BPF program. The verifier currently only supports
> > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.
> 
> This commit message is a bit too laconic. There is a lot of
> implications of various parts of this patch, it would be great to
> highlight most important ones. Please consider elaborating a bit more.

Not a problem, I'll do this in v3.
David Vernet Aug. 12, 2022, 12:46 a.m. UTC | #4
On Thu, Aug 11, 2022 at 04:22:43PM -0700, Andrii Nakryiko wrote:
> On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
> >
> > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> > helper function that allows BPF programs to drain samples from the ring
> > buffer, and invoke a callback for each. This patch adds a new
> > bpf_user_ringbuf_drain() helper that provides this abstraction.
> >
> > In order to support this, we needed to also add a new PTR_TO_DYNPTR
> > register type to reflect a dynptr that was allocated by a helper function
> > and passed to a BPF program. The verifier currently only supports
> > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.
> 
> This commit message is a bit too laconic. There is a lot of
> implications of various parts of this patch, it would be great to
> highlight most important ones. Please consider elaborating a bit more.

Argh, sent my last email too early that only responded to this too early.
I'll do this in v3, as mentioned in my other email.

> > diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> > index a341f877b230..ca125648d7fd 100644
> > --- a/include/uapi/linux/bpf.h
> > +++ b/include/uapi/linux/bpf.h
> > @@ -5332,6 +5332,13 @@ union bpf_attr {
> >   *             **-EACCES** if the SYN cookie is not valid.
> >   *
> >   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> > + *
> > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> > + *     Description
> > + *             Drain samples from the specified user ringbuffer, and invoke the
> > + *             provided callback for each such sample.
> 
> please specify what's the expected signature of callback_fn

Will do, unfortunatley we're inconsistent in doing this in other helper
functions, so it wasn't clear from context.

> > + *     Return
> > + *             An error if a sample could not be drained.
> 
> Negative error, right? And might be worth it briefly describing what
> are the situation(s) when you won't be able to drain a sample?
> 
> Also please clarify if having no sample to drain is an error or not?
> 
> It's also important to specify that if no error (and -ENODATA
> shouldn't be an error, actually) occurred then we get number of
> consumed samples back.

Agreed, I'll add more data here in the next version.

[...]

> > +
> > +static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
> > +                                     struct poll_table_struct *pts)
> > +{
> > +       return ringbuf_map_poll(map, filp, pts, true);
> > +}
> > +
> > +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
> > +                                     struct poll_table_struct *pts)
> > +{
> > +       return ringbuf_map_poll(map, filp, pts, false);
> >  }
> 
> This is an even stronger case where I think we should keep two
> implementations completely separate. Please keep existing
> ringbuf_map_poll as is and just add a user variant as a separate
> implementation

Agreed, I'll split both this and  into separate functions (and the mmaps)
into separate functions.

> >  BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
> > @@ -309,7 +326,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
> >         .map_alloc = ringbuf_map_alloc,
> >         .map_free = ringbuf_map_free,
> >         .map_mmap = ringbuf_map_mmap_kern,
> > -       .map_poll = ringbuf_map_poll,
> > +       .map_poll = ringbuf_map_poll_kern,
> >         .map_lookup_elem = ringbuf_map_lookup_elem,
> >         .map_update_elem = ringbuf_map_update_elem,
> >         .map_delete_elem = ringbuf_map_delete_elem,
> > @@ -323,6 +340,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = {
> >         .map_alloc = ringbuf_map_alloc,
> >         .map_free = ringbuf_map_free,
> >         .map_mmap = ringbuf_map_mmap_user,
> > +       .map_poll = ringbuf_map_poll_user,
> >         .map_lookup_elem = ringbuf_map_lookup_elem,
> >         .map_update_elem = ringbuf_map_update_elem,
> >         .map_delete_elem = ringbuf_map_delete_elem,
> > @@ -605,3 +623,132 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
> >         .arg1_type      = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
> >         .arg2_type      = ARG_ANYTHING,
> >  };
> > +
> > +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> > +                                  u32 *size)
> 
> "poll" part is quite confusing as it has nothing to do with epoll and
> the other poll implementation. Maybe "peek"? It peek into next sample
> without consuming it, seems appropriate
> 
> nit: this declaration can also stay on single line

Yeah, I agree that "poll" is confusing. I think "peek" is a good option. I
was also considering "consume", but I don't think that makes sense given
that we're not actually done consuming the sample until we release it.

> > +{
> > +       unsigned long cons_pos, prod_pos;
> > +       u32 sample_len, total_len;
> > +       u32 *hdr;
> > +       int err;
> > +       int busy = 0;
> 
> nit: combine matching types:
> 
> u32 sample_len, total_len, *hdr;
> int err, busy = 0;

Ack.

> > +
> > +       /* If another consumer is already consuming a sample, wait for them to
> > +        * finish.
> > +        */
> > +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> > +               return -EBUSY;
> > +
> > +       /* Synchronizes with smp_store_release() in user-space. */
> > +       prod_pos = smp_load_acquire(&rb->producer_pos);
> 
> I think we should enforce that prod_pos is a multiple of 8

Agreed, I'll add a check and selftest for this.

> > +       /* Synchronizes with smp_store_release() in
> > +        * __bpf_user_ringbuf_sample_release().
> > +        */
> > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > +       if (cons_pos >= prod_pos) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -ENODATA;
> > +       }
> > +
> > +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> > +       sample_len = *hdr;
> 
> do we need smp_load_acquire() here? libbpf's ring_buffer
> implementation uses load_acquire here

I thought about this when I was first adding the logic, but I can't
convince myself that it's necessary and wasn't able to figure out why we
did a load acquire on the len in libbpf. The kernel doesn't do a store
release on the header, so I'm not sure what the load acquire in libbpf
actually accomplishes. I could certainly be missing something, but I
_think_ the important thing is that we have load-acquire / store-release
pairs for the consumer and producer positions.

> > +       /* Check that the sample can fit into a dynptr. */
> > +       err = bpf_dynptr_check_size(sample_len);
> > +       if (err) {
> > +               atomic_set(&rb->busy, 0);
> > +               return err;
> > +       }
> > +
> > +       /* Check that the sample fits within the region advertised by the
> > +        * consumer position.
> > +        */
> > +       total_len = sample_len + BPF_RINGBUF_HDR_SZ;
> 
> round up to closest multiple of 8? All the pointers into ringbuf data
> area should be 8-byte aligned

Will do.

> > +       if (total_len > prod_pos - cons_pos) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -E2BIG;
> > +       }
> > +
> > +       /* Check that the sample fits within the data region of the ring buffer.
> > +        */
> > +       if (total_len > rb->mask + 1) {
> > +               atomic_set(&rb->busy, 0);
> > +               return -E2BIG;
> > +       }
> > +
> > +       /* consumer_pos is updated when the sample is released.
> > +        */
> > +
> 
> nit: unnecessary empty line
> 
> and please keep single-line comments as single-line, no */ on separate
> line in such case

Will do.

> > +       *sample = (void *)((uintptr_t)rb->data +
> > +                          ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> > +       *size = sample_len;
> > +
> > +       return 0;
> > +}
> > +
> > +static void
> > +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> > +                                 u64 flags)
> 
> try to keep single lines if they are under 100 characters

Ack, this seems to really differ by subsystem. I'll follow this norm for
BPF.

> > +{
> > +
> > +
> 
> empty lines, why?

Apologies, thanks for catching this.

> > +       /* To release the ringbuffer, just increment the producer position to
> > +        * signal that a new sample can be consumed. The busy bit is cleared by
> > +        * userspace when posting a new sample to the ringbuffer.
> > +        */
> > +       smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> > +                         BPF_RINGBUF_HDR_SZ);
> > +
> > +       if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
> 
> please use () around bit operator expressions

Ack.

> > +               irq_work_queue(&rb->work);
> > +
> > +       atomic_set(&rb->busy, 0);
> 
> set busy before scheduling irq_work? why delaying?

Yeah, I thought about this. I don't think there's any problem with clearing
busy before we schedule the irq_work_queue(). I elected to do this to err
on the side of simpler logic until we observed contention, but yeah, let me
just do the more performant thing here.

> > +}
> > +
> > +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> > +          void *, callback_fn, void *, callback_ctx, u64, flags)
> > +{
> > +       struct bpf_ringbuf *rb;
> > +       long num_samples = 0, ret = 0;
> > +       int err;
> > +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> > +       u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
> > +
> > +       if (unlikely(flags & ~wakeup_flags))
> > +               return -EINVAL;
> > +
> > +       /* The two wakeup flags are mutually exclusive. */
> > +       if (unlikely((flags & wakeup_flags) == wakeup_flags))
> > +               return -EINVAL;
> 
> we don't check this for existing ringbuf, so maybe let's keep it
> consistent? FORCE_WAKEUP is just stronger than NO_WAKEUP

Ack.

> > +
> > +       rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
> > +       do {
> > +               u32 size;
> > +               void *sample;
> > +
> > +               err = __bpf_user_ringbuf_poll(rb, &sample, &size);
> > +
> 
> nit: don't keep empty line between function call and error check

Ack.

> > +               if (!err) {
> 
> so -ENODATA is a special error and you should stop if you get that.
> But for any other error we should propagate error back, not just
> silently consuming it
> 
> maybe
> 
> err = ...
> if (err) {
>     if (err == -ENODATA)
>       break;
>     return err;
> }
> 
> ?

Agreed, I'll fix this.

> > +                       struct bpf_dynptr_kern dynptr;
> > +
> > +                       bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
> > +                                       0, size);
> 
> we should try to avoid unnecessary re-initialization of dynptr, let's
> initialize it once and then just update data and size field inside the
> loop?

Hmm ok, let me give that a try. 

> > +                       ret = callback((u64)&dynptr,
> > +                                       (u64)(uintptr_t)callback_ctx, 0, 0, 0);
> > +
> > +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> > +                       num_samples++;
> > +               }
> > +       } while (err == 0 && num_samples < 4096 && ret == 0);
> > +
> 
> 4096 is pretty arbitrary. Definitely worth noting it somewhere and it
> seems somewhat low, tbh...
> 
> ideally we'd cond_resched() from time to time, but that would require
> BPF program to be sleepable, so we can't do that :(

Yeah, I knew this would come up in discussion. I would love to do
cond_resched() here, but as you said, I don't think it's an option :-/ And
given the fact that we're calling back into the BPF program, we have to be
cognizant of things taking a while and clogging up the CPU. What do you
think is a more reasonable number than 4096?

[...]

> >         case ARG_PTR_TO_DYNPTR:
> > +               /* We only need to check for initialized / uninitialized helper
> > +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> > +                * is that if it is, that a helper function initialized the
> > +                * dynptr on behalf of the BPF program.
> > +                */
> > +               if (reg->type & MEM_ALLOC)
> 
> Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier?
> Normal dynptr created and used inside BPF program on the stack are
> actually PTR_TO_STACK, so that should be enough distinction? Or am I
> missing something?

I think this would also work in the current state of the codebase, but IIUC
it relies on PTR_TO_STACK being the only way that a BPF program could ever
allocate a dynptr. I was trying to guard against the case of a helper being
added in the future that e.g. returned a dynamically allocated dynptr that
the caller would eventually need to release in another helper call.
MEM_ALLOC seems like the correct modifier to more generally denote that the
dynptr was externally allocated.  If you think this is overkill I'm totally
fine with removing MEM_ALLOC. We can always add it down the road if we add
a new helper that requires it.

[...]

> > @@ -7477,11 +7524,15 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
> >                 /* Find the id of the dynptr we're acquiring a reference to */
> >                 for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
> >                         if (arg_type_is_dynptr(fn->arg_type[i])) {
> > +                               struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
> > +
> >                                 if (dynptr_id) {
> >                                         verbose(env, "verifier internal error: multiple dynptr args in func\n");
> >                                         return -EFAULT;
> >                                 }
> > -                               dynptr_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
> > +
> > +                               if (!(reg->type & MEM_ALLOC))
> > +                                       dynptr_id = stack_slot_get_id(env, reg);
> 
> this part has changed in bpf-next

Yeah, this is all rewired in the new version I sent out in v2 (and will
send out in v3 when I apply your other suggestions).

[...]

Thanks for the thorough review!

- David
Andrii Nakryiko Aug. 16, 2022, 6:57 p.m. UTC | #5
On Thu, Aug 11, 2022 at 5:46 PM David Vernet <void@manifault.com> wrote:
>
> On Thu, Aug 11, 2022 at 04:22:43PM -0700, Andrii Nakryiko wrote:
> > On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
> > >
> > > Now that we have a BPF_MAP_TYPE_USER_RINGBUF map type, we need to add a
> > > helper function that allows BPF programs to drain samples from the ring
> > > buffer, and invoke a callback for each. This patch adds a new
> > > bpf_user_ringbuf_drain() helper that provides this abstraction.
> > >
> > > In order to support this, we needed to also add a new PTR_TO_DYNPTR
> > > register type to reflect a dynptr that was allocated by a helper function
> > > and passed to a BPF program. The verifier currently only supports
> > > PTR_TO_DYNPTR registers that are also DYNPTR_TYPE_LOCAL and MEM_ALLOC.
> >
> > This commit message is a bit too laconic. There is a lot of
> > implications of various parts of this patch, it would be great to
> > highlight most important ones. Please consider elaborating a bit more.
>
> Argh, sent my last email too early that only responded to this too early.
> I'll do this in v3, as mentioned in my other email.
>

no worries

> > > diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> > > index a341f877b230..ca125648d7fd 100644
> > > --- a/include/uapi/linux/bpf.h
> > > +++ b/include/uapi/linux/bpf.h
> > > @@ -5332,6 +5332,13 @@ union bpf_attr {
> > >   *             **-EACCES** if the SYN cookie is not valid.
> > >   *
> > >   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> > > + *
> > > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> > > + *     Description
> > > + *             Drain samples from the specified user ringbuffer, and invoke the
> > > + *             provided callback for each such sample.
> >
> > please specify what's the expected signature of callback_fn
>
> Will do, unfortunatley we're inconsistent in doing this in other helper
> functions, so it wasn't clear from context.

That means we missed it for other helpers. The idea was to always
specify expected signature in UAPI comment, ideally we fix all the
missing cases.

>
> > > + *     Return
> > > + *             An error if a sample could not be drained.
> >
> > Negative error, right? And might be worth it briefly describing what
> > are the situation(s) when you won't be able to drain a sample?
> >
> > Also please clarify if having no sample to drain is an error or not?
> >
> > It's also important to specify that if no error (and -ENODATA
> > shouldn't be an error, actually) occurred then we get number of
> > consumed samples back.
>
> Agreed, I'll add more data here in the next version.
>
> [...]
>
> > > +
> > > +static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
> > > +                                     struct poll_table_struct *pts)
> > > +{
> > > +       return ringbuf_map_poll(map, filp, pts, true);
> > > +}
> > > +
> > > +static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
> > > +                                     struct poll_table_struct *pts)
> > > +{
> > > +       return ringbuf_map_poll(map, filp, pts, false);
> > >  }
> >
> > This is an even stronger case where I think we should keep two
> > implementations completely separate. Please keep existing
> > ringbuf_map_poll as is and just add a user variant as a separate
> > implementation
>
> Agreed, I'll split both this and  into separate functions (and the mmaps)
> into separate functions.
>
> > >  BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
> > > @@ -309,7 +326,7 @@ const struct bpf_map_ops ringbuf_map_ops = {
> > >         .map_alloc = ringbuf_map_alloc,
> > >         .map_free = ringbuf_map_free,
> > >         .map_mmap = ringbuf_map_mmap_kern,
> > > -       .map_poll = ringbuf_map_poll,
> > > +       .map_poll = ringbuf_map_poll_kern,
> > >         .map_lookup_elem = ringbuf_map_lookup_elem,
> > >         .map_update_elem = ringbuf_map_update_elem,
> > >         .map_delete_elem = ringbuf_map_delete_elem,
> > > @@ -323,6 +340,7 @@ const struct bpf_map_ops user_ringbuf_map_ops = {
> > >         .map_alloc = ringbuf_map_alloc,
> > >         .map_free = ringbuf_map_free,
> > >         .map_mmap = ringbuf_map_mmap_user,
> > > +       .map_poll = ringbuf_map_poll_user,
> > >         .map_lookup_elem = ringbuf_map_lookup_elem,
> > >         .map_update_elem = ringbuf_map_update_elem,
> > >         .map_delete_elem = ringbuf_map_delete_elem,
> > > @@ -605,3 +623,132 @@ const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
> > >         .arg1_type      = ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
> > >         .arg2_type      = ARG_ANYTHING,
> > >  };
> > > +
> > > +static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> > > +                                  u32 *size)
> >
> > "poll" part is quite confusing as it has nothing to do with epoll and
> > the other poll implementation. Maybe "peek"? It peek into next sample
> > without consuming it, seems appropriate
> >
> > nit: this declaration can also stay on single line
>
> Yeah, I agree that "poll" is confusing. I think "peek" is a good option. I
> was also considering "consume", but I don't think that makes sense given
> that we're not actually done consuming the sample until we release it.

Exactly, consume is very bad as we don't "consume" it in the sense of
making that space available for reuse.

>
> > > +{
> > > +       unsigned long cons_pos, prod_pos;
> > > +       u32 sample_len, total_len;
> > > +       u32 *hdr;
> > > +       int err;
> > > +       int busy = 0;
> >
> > nit: combine matching types:
> >
> > u32 sample_len, total_len, *hdr;
> > int err, busy = 0;
>
> Ack.
>
> > > +
> > > +       /* If another consumer is already consuming a sample, wait for them to
> > > +        * finish.
> > > +        */
> > > +       if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> > > +               return -EBUSY;
> > > +
> > > +       /* Synchronizes with smp_store_release() in user-space. */
> > > +       prod_pos = smp_load_acquire(&rb->producer_pos);
> >
> > I think we should enforce that prod_pos is a multiple of 8
>
> Agreed, I'll add a check and selftest for this.

Yep, consider also adding few tests where user-space intentionally
breaks the contract to make sure that kernel stays intact (if you
already did that, apologies, I haven't looked at selftests much).

>
> > > +       /* Synchronizes with smp_store_release() in
> > > +        * __bpf_user_ringbuf_sample_release().
> > > +        */
> > > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > > +       if (cons_pos >= prod_pos) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return -ENODATA;
> > > +       }
> > > +
> > > +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> > > +       sample_len = *hdr;
> >
> > do we need smp_load_acquire() here? libbpf's ring_buffer
> > implementation uses load_acquire here
>
> I thought about this when I was first adding the logic, but I can't
> convince myself that it's necessary and wasn't able to figure out why we
> did a load acquire on the len in libbpf. The kernel doesn't do a store
> release on the header, so I'm not sure what the load acquire in libbpf
> actually accomplishes. I could certainly be missing something, but I
> _think_ the important thing is that we have load-acquire / store-release
> pairs for the consumer and producer positions.

kernel does xchg on len on the kernel side, which is stronger than
smp_store_release (I think it was Paul's suggestion instead of doing
explicit memory barrier, but my memories are hazy for exact reasons).

Right now this might not be necessary, but if we add support for busy
bit in a sample header, it will be closer to what BPF ringbuf is doing
right now, with producer position being a reservation pointer, but
sample itself won't be "readable" until sample header is updated and
its busy bit is unset.

>
> > > +       /* Check that the sample can fit into a dynptr. */
> > > +       err = bpf_dynptr_check_size(sample_len);
> > > +       if (err) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return err;
> > > +       }
> > > +
> > > +       /* Check that the sample fits within the region advertised by the
> > > +        * consumer position.
> > > +        */
> > > +       total_len = sample_len + BPF_RINGBUF_HDR_SZ;
> >
> > round up to closest multiple of 8? All the pointers into ringbuf data
> > area should be 8-byte aligned
>
> Will do.
>
> > > +       if (total_len > prod_pos - cons_pos) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return -E2BIG;
> > > +       }
> > > +
> > > +       /* Check that the sample fits within the data region of the ring buffer.
> > > +        */
> > > +       if (total_len > rb->mask + 1) {
> > > +               atomic_set(&rb->busy, 0);
> > > +               return -E2BIG;
> > > +       }
> > > +
> > > +       /* consumer_pos is updated when the sample is released.
> > > +        */
> > > +
> >
> > nit: unnecessary empty line
> >
> > and please keep single-line comments as single-line, no */ on separate
> > line in such case
>
> Will do.
>
> > > +       *sample = (void *)((uintptr_t)rb->data +
> > > +                          ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
> > > +       *size = sample_len;
> > > +
> > > +       return 0;
> > > +}
> > > +
> > > +static void
> > > +__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> > > +                                 u64 flags)
> >
> > try to keep single lines if they are under 100 characters
>
> Ack, this seems to really differ by subsystem. I'll follow this norm for
> BPF.

It's a relatively recent relaxation (like a year old or something) in
kernel coding style. Single-line usually has much better readability,
so I prefer that while staying within reasonable limits.

>
> > > +{
> > > +
> > > +
> >
> > empty lines, why?
>
> Apologies, thanks for catching this.
>
> > > +       /* To release the ringbuffer, just increment the producer position to
> > > +        * signal that a new sample can be consumed. The busy bit is cleared by
> > > +        * userspace when posting a new sample to the ringbuffer.
> > > +        */
> > > +       smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> > > +                         BPF_RINGBUF_HDR_SZ);
> > > +
> > > +       if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
> >
> > please use () around bit operator expressions
>
> Ack.
>
> > > +               irq_work_queue(&rb->work);
> > > +
> > > +       atomic_set(&rb->busy, 0);
> >
> > set busy before scheduling irq_work? why delaying?
>
> Yeah, I thought about this. I don't think there's any problem with clearing
> busy before we schedule the irq_work_queue(). I elected to do this to err
> on the side of simpler logic until we observed contention, but yeah, let me
> just do the more performant thing here.

busy is like a global lock, so freeing it ASAP is good, so yeah,
unless there are some bad implications, let's do it early

>
> > > +}
> > > +
> > > +BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
> > > +          void *, callback_fn, void *, callback_ctx, u64, flags)
> > > +{
> > > +       struct bpf_ringbuf *rb;
> > > +       long num_samples = 0, ret = 0;
> > > +       int err;
> > > +       bpf_callback_t callback = (bpf_callback_t)callback_fn;
> > > +       u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
> > > +
> > > +       if (unlikely(flags & ~wakeup_flags))
> > > +               return -EINVAL;
> > > +
> > > +       /* The two wakeup flags are mutually exclusive. */
> > > +       if (unlikely((flags & wakeup_flags) == wakeup_flags))
> > > +               return -EINVAL;
> >
> > we don't check this for existing ringbuf, so maybe let's keep it
> > consistent? FORCE_WAKEUP is just stronger than NO_WAKEUP
>
> Ack.
>
> > > +
> > > +       rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
> > > +       do {
> > > +               u32 size;
> > > +               void *sample;
> > > +
> > > +               err = __bpf_user_ringbuf_poll(rb, &sample, &size);
> > > +
> >
> > nit: don't keep empty line between function call and error check
>
> Ack.
>
> > > +               if (!err) {
> >
> > so -ENODATA is a special error and you should stop if you get that.
> > But for any other error we should propagate error back, not just
> > silently consuming it
> >
> > maybe
> >
> > err = ...
> > if (err) {
> >     if (err == -ENODATA)
> >       break;
> >     return err;
> > }
> >
> > ?
>
> Agreed, I'll fix this.
>
> > > +                       struct bpf_dynptr_kern dynptr;
> > > +
> > > +                       bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
> > > +                                       0, size);
> >
> > we should try to avoid unnecessary re-initialization of dynptr, let's
> > initialize it once and then just update data and size field inside the
> > loop?
>
> Hmm ok, let me give that a try.
>

discussed this offline, it might not be worth the hassle given dynptr
init is just setting 4 fields

> > > +                       ret = callback((u64)&dynptr,
> > > +                                       (u64)(uintptr_t)callback_ctx, 0, 0, 0);
> > > +
> > > +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> > > +                       num_samples++;
> > > +               }
> > > +       } while (err == 0 && num_samples < 4096 && ret == 0);
> > > +
> >
> > 4096 is pretty arbitrary. Definitely worth noting it somewhere and it
> > seems somewhat low, tbh...
> >
> > ideally we'd cond_resched() from time to time, but that would require
> > BPF program to be sleepable, so we can't do that :(
>
> Yeah, I knew this would come up in discussion. I would love to do
> cond_resched() here, but as you said, I don't think it's an option :-/ And
> given the fact that we're calling back into the BPF program, we have to be
> cognizant of things taking a while and clogging up the CPU. What do you
> think is a more reasonable number than 4096?

I don't know, tbh, but 4096 seems pretty low. For bpf_loop() we allow
up to 2mln iterations. I'd bump it up to 64-128K range, probably. But
also please move it into some internal #define'd constant, not some
integer literal buried in a code

>
> [...]
>
> > >         case ARG_PTR_TO_DYNPTR:
> > > +               /* We only need to check for initialized / uninitialized helper
> > > +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> > > +                * is that if it is, that a helper function initialized the
> > > +                * dynptr on behalf of the BPF program.
> > > +                */
> > > +               if (reg->type & MEM_ALLOC)
> >
> > Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier?
> > Normal dynptr created and used inside BPF program on the stack are
> > actually PTR_TO_STACK, so that should be enough distinction? Or am I
> > missing something?
>
> I think this would also work in the current state of the codebase, but IIUC
> it relies on PTR_TO_STACK being the only way that a BPF program could ever
> allocate a dynptr. I was trying to guard against the case of a helper being
> added in the future that e.g. returned a dynamically allocated dynptr that
> the caller would eventually need to release in another helper call.
> MEM_ALLOC seems like the correct modifier to more generally denote that the
> dynptr was externally allocated.  If you think this is overkill I'm totally
> fine with removing MEM_ALLOC. We can always add it down the road if we add
> a new helper that requires it.
>

Hm.. I don't see a huge need for more flags for this, so I'd keep it
simple for now and if in the future we do have such a use case, we'll
address it at that time?


> [...]
>
> > > @@ -7477,11 +7524,15 @@ static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
> > >                 /* Find the id of the dynptr we're acquiring a reference to */
> > >                 for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
> > >                         if (arg_type_is_dynptr(fn->arg_type[i])) {
> > > +                               struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
> > > +
> > >                                 if (dynptr_id) {
> > >                                         verbose(env, "verifier internal error: multiple dynptr args in func\n");
> > >                                         return -EFAULT;
> > >                                 }
> > > -                               dynptr_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
> > > +
> > > +                               if (!(reg->type & MEM_ALLOC))
> > > +                                       dynptr_id = stack_slot_get_id(env, reg);
> >
> > this part has changed in bpf-next
>
> Yeah, this is all rewired in the new version I sent out in v2 (and will
> send out in v3 when I apply your other suggestions).

Cool, thanks. It was a pretty tight race between my comments and your v2 :)

>
> [...]
>
> Thanks for the thorough review!
>
> - David
David Vernet Aug. 16, 2022, 10:14 p.m. UTC | #6
On Tue, Aug 16, 2022 at 11:57:10AM -0700, Andrii Nakryiko wrote:

[...]

> > > > diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> > > > index a341f877b230..ca125648d7fd 100644
> > > > --- a/include/uapi/linux/bpf.h
> > > > +++ b/include/uapi/linux/bpf.h
> > > > @@ -5332,6 +5332,13 @@ union bpf_attr {
> > > >   *             **-EACCES** if the SYN cookie is not valid.
> > > >   *
> > > >   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> > > > + *
> > > > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> > > > + *     Description
> > > > + *             Drain samples from the specified user ringbuffer, and invoke the
> > > > + *             provided callback for each such sample.
> > >
> > > please specify what's the expected signature of callback_fn
> >
> > Will do, unfortunatley we're inconsistent in doing this in other helper
> > functions, so it wasn't clear from context.
> 
> That means we missed it for other helpers. The idea was to always
> specify expected signature in UAPI comment, ideally we fix all the
> missing cases.

Agreed -- I'll take care of that as a follow-on patch-set.

> > Agreed, I'll add a check and selftest for this.
> 
> Yep, consider also adding few tests where user-space intentionally
> breaks the contract to make sure that kernel stays intact (if you
> already did that, apologies, I haven't looked at selftests much).

The only negative tests I currently have from user-space are verifying
that mapping permissions are correctly enforced. Happy to add some more
that tests boundaries for other parts of the API -- I agree that's both
useful and prudent.

> > > > +       /* Synchronizes with smp_store_release() in
> > > > +        * __bpf_user_ringbuf_sample_release().
> > > > +        */
> > > > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > > > +       if (cons_pos >= prod_pos) {
> > > > +               atomic_set(&rb->busy, 0);
> > > > +               return -ENODATA;
> > > > +       }
> > > > +
> > > > +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> > > > +       sample_len = *hdr;
> > >
> > > do we need smp_load_acquire() here? libbpf's ring_buffer
> > > implementation uses load_acquire here
> >
> > I thought about this when I was first adding the logic, but I can't
> > convince myself that it's necessary and wasn't able to figure out why we
> > did a load acquire on the len in libbpf. The kernel doesn't do a store
> > release on the header, so I'm not sure what the load acquire in libbpf
> > actually accomplishes. I could certainly be missing something, but I
> > _think_ the important thing is that we have load-acquire / store-release
> > pairs for the consumer and producer positions.
> 
> kernel does xchg on len on the kernel side, which is stronger than
> smp_store_release (I think it was Paul's suggestion instead of doing
> explicit memory barrier, but my memories are hazy for exact reasons).

Hmmm, yeah, for the kernel-producer ringbuffer, I believe the explicit
memory barrier is unnecessary because:

o The smp_store_release() on producer_pos provides ordering w.r.t.
  producer_pos, and the write to hdr->len which includes the busy-bit
  should therefore be visibile in libbpf, which does an
  smp_load_acquire().
o The xchg() when the sample is committed provides full barriers before
  and after, so the consumer is guaranteed to read the written contents of
  the sample IFF it also sees the BPF_RINGBUF_BUSY_BIT as unset.

I can't see any scenario in which a barrier would add synchronization not
already provided, though this stuff is tricky so I may have missed
something.

> Right now this might not be necessary, but if we add support for busy
> bit in a sample header, it will be closer to what BPF ringbuf is doing
> right now, with producer position being a reservation pointer, but
> sample itself won't be "readable" until sample header is updated and
> its busy bit is unset.

Regarding this patch, after thinking about this more I _think_ I do need
an xchg() (or equivalent operation with full barrier semantics) in
userspace when updating the producer_pos when committing the sample.
Which, after applying your suggestion (which I agree with) of supporting
BPF_RINGBUF_BUSY_BIT and BPF_RINGBUF_DISCARD_BIT from the get-go, would
similarly be an xchg() when setting the header, paired with an
smp_load_acquire() when reading the header in the kernel.

> > Yeah, I thought about this. I don't think there's any problem with clearing
> > busy before we schedule the irq_work_queue(). I elected to do this to err
> > on the side of simpler logic until we observed contention, but yeah, let me
> > just do the more performant thing here.
> 
> busy is like a global lock, so freeing it ASAP is good, so yeah,
> unless there are some bad implications, let's do it early

Ack.

[...]

> > > > +                       ret = callback((u64)&dynptr,
> > > > +                                       (u64)(uintptr_t)callback_ctx, 0, 0, 0);
> > > > +
> > > > +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> > > > +                       num_samples++;
> > > > +               }
> > > > +       } while (err == 0 && num_samples < 4096 && ret == 0);
> > > > +
> > >
> > > 4096 is pretty arbitrary. Definitely worth noting it somewhere and it
> > > seems somewhat low, tbh...
> > >
> > > ideally we'd cond_resched() from time to time, but that would require
> > > BPF program to be sleepable, so we can't do that :(
> >
> > Yeah, I knew this would come up in discussion. I would love to do
> > cond_resched() here, but as you said, I don't think it's an option :-/ And
> > given the fact that we're calling back into the BPF program, we have to be
> > cognizant of things taking a while and clogging up the CPU. What do you
> > think is a more reasonable number than 4096?
> 
> I don't know, tbh, but 4096 seems pretty low. For bpf_loop() we allow
> up to 2mln iterations. I'd bump it up to 64-128K range, probably. But
> also please move it into some internal #define'd constant, not some
> integer literal buried in a code

Sounds good to me. Maybe at some point we can make this configurable, or
something. If bpf_loop() allows a hard-coded number of iterations, I think
it's more forgivable to do the same here. I'll bump it up to 128k and move
it into a constant so it's not a magic number.

> >
> > [...]
> >
> > > >         case ARG_PTR_TO_DYNPTR:
> > > > +               /* We only need to check for initialized / uninitialized helper
> > > > +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> > > > +                * is that if it is, that a helper function initialized the
> > > > +                * dynptr on behalf of the BPF program.
> > > > +                */
> > > > +               if (reg->type & MEM_ALLOC)
> > >
> > > Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier?
> > > Normal dynptr created and used inside BPF program on the stack are
> > > actually PTR_TO_STACK, so that should be enough distinction? Or am I
> > > missing something?
> >
> > I think this would also work in the current state of the codebase, but IIUC
> > it relies on PTR_TO_STACK being the only way that a BPF program could ever
> > allocate a dynptr. I was trying to guard against the case of a helper being
> > added in the future that e.g. returned a dynamically allocated dynptr that
> > the caller would eventually need to release in another helper call.
> > MEM_ALLOC seems like the correct modifier to more generally denote that the
> > dynptr was externally allocated.  If you think this is overkill I'm totally
> > fine with removing MEM_ALLOC. We can always add it down the road if we add
> > a new helper that requires it.
> >
> 
> Hm.. I don't see a huge need for more flags for this, so I'd keep it
> simple for now and if in the future we do have such a use case, we'll
> address it at that time?

Sounds good to me.

Thanks,
David
Andrii Nakryiko Aug. 16, 2022, 10:59 p.m. UTC | #7
On Tue, Aug 16, 2022 at 3:14 PM David Vernet <void@manifault.com> wrote:
>
> On Tue, Aug 16, 2022 at 11:57:10AM -0700, Andrii Nakryiko wrote:
>
> [...]
>
> > > > > diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
> > > > > index a341f877b230..ca125648d7fd 100644
> > > > > --- a/include/uapi/linux/bpf.h
> > > > > +++ b/include/uapi/linux/bpf.h
> > > > > @@ -5332,6 +5332,13 @@ union bpf_attr {
> > > > >   *             **-EACCES** if the SYN cookie is not valid.
> > > > >   *
> > > > >   *             **-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
> > > > > + *
> > > > > + * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
> > > > > + *     Description
> > > > > + *             Drain samples from the specified user ringbuffer, and invoke the
> > > > > + *             provided callback for each such sample.
> > > >
> > > > please specify what's the expected signature of callback_fn
> > >
> > > Will do, unfortunatley we're inconsistent in doing this in other helper
> > > functions, so it wasn't clear from context.
> >
> > That means we missed it for other helpers. The idea was to always
> > specify expected signature in UAPI comment, ideally we fix all the
> > missing cases.
>
> Agreed -- I'll take care of that as a follow-on patch-set.
>
> > > Agreed, I'll add a check and selftest for this.
> >
> > Yep, consider also adding few tests where user-space intentionally
> > breaks the contract to make sure that kernel stays intact (if you
> > already did that, apologies, I haven't looked at selftests much).
>
> The only negative tests I currently have from user-space are verifying
> that mapping permissions are correctly enforced. Happy to add some more
> that tests boundaries for other parts of the API -- I agree that's both
> useful and prudent.
>
> > > > > +       /* Synchronizes with smp_store_release() in
> > > > > +        * __bpf_user_ringbuf_sample_release().
> > > > > +        */
> > > > > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > > > > +       if (cons_pos >= prod_pos) {
> > > > > +               atomic_set(&rb->busy, 0);
> > > > > +               return -ENODATA;
> > > > > +       }
> > > > > +
> > > > > +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> > > > > +       sample_len = *hdr;
> > > >
> > > > do we need smp_load_acquire() here? libbpf's ring_buffer
> > > > implementation uses load_acquire here
> > >
> > > I thought about this when I was first adding the logic, but I can't
> > > convince myself that it's necessary and wasn't able to figure out why we
> > > did a load acquire on the len in libbpf. The kernel doesn't do a store
> > > release on the header, so I'm not sure what the load acquire in libbpf
> > > actually accomplishes. I could certainly be missing something, but I
> > > _think_ the important thing is that we have load-acquire / store-release
> > > pairs for the consumer and producer positions.
> >
> > kernel does xchg on len on the kernel side, which is stronger than
> > smp_store_release (I think it was Paul's suggestion instead of doing
> > explicit memory barrier, but my memories are hazy for exact reasons).
>
> Hmmm, yeah, for the kernel-producer ringbuffer, I believe the explicit
> memory barrier is unnecessary because:
>
> o The smp_store_release() on producer_pos provides ordering w.r.t.
>   producer_pos, and the write to hdr->len which includes the busy-bit
>   should therefore be visibile in libbpf, which does an
>   smp_load_acquire().
> o The xchg() when the sample is committed provides full barriers before
>   and after, so the consumer is guaranteed to read the written contents of
>   the sample IFF it also sees the BPF_RINGBUF_BUSY_BIT as unset.
>
> I can't see any scenario in which a barrier would add synchronization not
> already provided, though this stuff is tricky so I may have missed
> something.
>
> > Right now this might not be necessary, but if we add support for busy
> > bit in a sample header, it will be closer to what BPF ringbuf is doing
> > right now, with producer position being a reservation pointer, but
> > sample itself won't be "readable" until sample header is updated and
> > its busy bit is unset.
>
> Regarding this patch, after thinking about this more I _think_ I do need
> an xchg() (or equivalent operation with full barrier semantics) in
> userspace when updating the producer_pos when committing the sample.
> Which, after applying your suggestion (which I agree with) of supporting
> BPF_RINGBUF_BUSY_BIT and BPF_RINGBUF_DISCARD_BIT from the get-go, would
> similarly be an xchg() when setting the header, paired with an
> smp_load_acquire() when reading the header in the kernel.


right, I think __atomic_store_n() can be used in libbpf for this with
seq_cst ordering


>
> > > Yeah, I thought about this. I don't think there's any problem with clearing
> > > busy before we schedule the irq_work_queue(). I elected to do this to err
> > > on the side of simpler logic until we observed contention, but yeah, let me
> > > just do the more performant thing here.
> >
> > busy is like a global lock, so freeing it ASAP is good, so yeah,
> > unless there are some bad implications, let's do it early
>
> Ack.
>
> [...]
>
> > > > > +                       ret = callback((u64)&dynptr,
> > > > > +                                       (u64)(uintptr_t)callback_ctx, 0, 0, 0);
> > > > > +
> > > > > +                       __bpf_user_ringbuf_sample_release(rb, size, flags);
> > > > > +                       num_samples++;
> > > > > +               }
> > > > > +       } while (err == 0 && num_samples < 4096 && ret == 0);
> > > > > +
> > > >
> > > > 4096 is pretty arbitrary. Definitely worth noting it somewhere and it
> > > > seems somewhat low, tbh...
> > > >
> > > > ideally we'd cond_resched() from time to time, but that would require
> > > > BPF program to be sleepable, so we can't do that :(
> > >
> > > Yeah, I knew this would come up in discussion. I would love to do
> > > cond_resched() here, but as you said, I don't think it's an option :-/ And
> > > given the fact that we're calling back into the BPF program, we have to be
> > > cognizant of things taking a while and clogging up the CPU. What do you
> > > think is a more reasonable number than 4096?
> >
> > I don't know, tbh, but 4096 seems pretty low. For bpf_loop() we allow
> > up to 2mln iterations. I'd bump it up to 64-128K range, probably. But
> > also please move it into some internal #define'd constant, not some
> > integer literal buried in a code
>
> Sounds good to me. Maybe at some point we can make this configurable, or
> something. If bpf_loop() allows a hard-coded number of iterations, I think
> it's more forgivable to do the same here. I'll bump it up to 128k and move
> it into a constant so it's not a magic number.
>
> > >
> > > [...]
> > >
> > > > >         case ARG_PTR_TO_DYNPTR:
> > > > > +               /* We only need to check for initialized / uninitialized helper
> > > > > +                * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
> > > > > +                * is that if it is, that a helper function initialized the
> > > > > +                * dynptr on behalf of the BPF program.
> > > > > +                */
> > > > > +               if (reg->type & MEM_ALLOC)
> > > >
> > > > Isn't PTR_TO_DYNPTR enough indication? Do we need MEM_ALLOC modifier?
> > > > Normal dynptr created and used inside BPF program on the stack are
> > > > actually PTR_TO_STACK, so that should be enough distinction? Or am I
> > > > missing something?
> > >
> > > I think this would also work in the current state of the codebase, but IIUC
> > > it relies on PTR_TO_STACK being the only way that a BPF program could ever
> > > allocate a dynptr. I was trying to guard against the case of a helper being
> > > added in the future that e.g. returned a dynamically allocated dynptr that
> > > the caller would eventually need to release in another helper call.
> > > MEM_ALLOC seems like the correct modifier to more generally denote that the
> > > dynptr was externally allocated.  If you think this is overkill I'm totally
> > > fine with removing MEM_ALLOC. We can always add it down the road if we add
> > > a new helper that requires it.
> > >
> >
> > Hm.. I don't see a huge need for more flags for this, so I'd keep it
> > simple for now and if in the future we do have such a use case, we'll
> > address it at that time?
>
> Sounds good to me.
>
> Thanks,
> David
David Vernet Aug. 17, 2022, 8:24 p.m. UTC | #8
On Tue, Aug 16, 2022 at 03:59:54PM -0700, Andrii Nakryiko wrote:

[...]

> > > > > > +       /* Synchronizes with smp_store_release() in
> > > > > > +        * __bpf_user_ringbuf_sample_release().
> > > > > > +        */
> > > > > > +       cons_pos = smp_load_acquire(&rb->consumer_pos);
> > > > > > +       if (cons_pos >= prod_pos) {
> > > > > > +               atomic_set(&rb->busy, 0);
> > > > > > +               return -ENODATA;
> > > > > > +       }
> > > > > > +
> > > > > > +       hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
> > > > > > +       sample_len = *hdr;
> > > > >
> > > > > do we need smp_load_acquire() here? libbpf's ring_buffer
> > > > > implementation uses load_acquire here
> > > >
> > > > I thought about this when I was first adding the logic, but I can't
> > > > convince myself that it's necessary and wasn't able to figure out why we
> > > > did a load acquire on the len in libbpf. The kernel doesn't do a store
> > > > release on the header, so I'm not sure what the load acquire in libbpf
> > > > actually accomplishes. I could certainly be missing something, but I
> > > > _think_ the important thing is that we have load-acquire / store-release
> > > > pairs for the consumer and producer positions.
> > >
> > > kernel does xchg on len on the kernel side, which is stronger than
> > > smp_store_release (I think it was Paul's suggestion instead of doing
> > > explicit memory barrier, but my memories are hazy for exact reasons).
> >
> > Hmmm, yeah, for the kernel-producer ringbuffer, I believe the explicit
> > memory barrier is unnecessary because:
> >
> > o The smp_store_release() on producer_pos provides ordering w.r.t.
> >   producer_pos, and the write to hdr->len which includes the busy-bit
> >   should therefore be visibile in libbpf, which does an
> >   smp_load_acquire().
> > o The xchg() when the sample is committed provides full barriers before
> >   and after, so the consumer is guaranteed to read the written contents of
> >   the sample IFF it also sees the BPF_RINGBUF_BUSY_BIT as unset.
> >
> > I can't see any scenario in which a barrier would add synchronization not
> > already provided, though this stuff is tricky so I may have missed
> > something.
> >
> > > Right now this might not be necessary, but if we add support for busy
> > > bit in a sample header, it will be closer to what BPF ringbuf is doing
> > > right now, with producer position being a reservation pointer, but
> > > sample itself won't be "readable" until sample header is updated and
> > > its busy bit is unset.
> >
> > Regarding this patch, after thinking about this more I _think_ I do need
> > an xchg() (or equivalent operation with full barrier semantics) in
> > userspace when updating the producer_pos when committing the sample.
> > Which, after applying your suggestion (which I agree with) of supporting
> > BPF_RINGBUF_BUSY_BIT and BPF_RINGBUF_DISCARD_BIT from the get-go, would
> > similarly be an xchg() when setting the header, paired with an
> > smp_load_acquire() when reading the header in the kernel.
> 
> 
> right, I think __atomic_store_n() can be used in libbpf for this with
> seq_cst ordering

__atomic_store_n(__ATOMIC_SEQ_CST) will do the correct thing on x86, but it
is not guaranteed to provide a full acq/rel barrier according to the C
standard. __atomic_store_n(__ATOMIC_SEQ_CST) means "store-release, and also
participates in the sequentially-consistent global ordering".

I believe we actually need an __atomic_store_n(__ATOMIC_ACQ_REL) here. I
don't _think_ __ATOMIC_SEQ_CST is necessary because we're not reliant on
any type of global, SC ordering. We just need to ensure that the sample is
only visible after the busy bit has been removed (or discard bit added) to
the header.

Thanks,
David
David Vernet Aug. 17, 2022, 9:03 p.m. UTC | #9
On Wed, Aug 17, 2022 at 03:24:40PM -0500, David Vernet wrote:

> [...]
> > right, I think __atomic_store_n() can be used in libbpf for this with
> > seq_cst ordering
> 
> __atomic_store_n(__ATOMIC_SEQ_CST) will do the correct thing on x86, but it
> is not guaranteed to provide a full acq/rel barrier according to the C
> standard. __atomic_store_n(__ATOMIC_SEQ_CST) means "store-release, and also
> participates in the sequentially-consistent global ordering".
> 
> I believe we actually need an __atomic_store_n(__ATOMIC_ACQ_REL) here. I

Sorry, I meant __atomic_exchange_n() rather than __atomic_store_n().
diff mbox series

Patch

diff --git a/include/linux/bpf.h b/include/linux/bpf.h
index 20c26aed7896..4d0d5f2a3ac8 100644
--- a/include/linux/bpf.h
+++ b/include/linux/bpf.h
@@ -401,7 +401,7 @@  enum bpf_type_flag {
 	/* DYNPTR points to memory local to the bpf program. */
 	DYNPTR_TYPE_LOCAL	= BIT(8 + BPF_BASE_TYPE_BITS),
 
-	/* DYNPTR points to a ringbuf record. */
+	/* DYNPTR points to a kernel-produced ringbuf record. */
 	DYNPTR_TYPE_RINGBUF	= BIT(9 + BPF_BASE_TYPE_BITS),
 
 	/* Size is known at compile time. */
@@ -606,6 +606,7 @@  enum bpf_reg_type {
 	PTR_TO_MEM,		 /* reg points to valid memory region */
 	PTR_TO_BUF,		 /* reg points to a read/write buffer */
 	PTR_TO_FUNC,		 /* reg points to a bpf program function */
+	PTR_TO_DYNPTR,		 /* reg points to a dynptr */
 	__BPF_REG_TYPE_MAX,
 
 	/* Extended reg_types. */
@@ -2410,6 +2411,7 @@  extern const struct bpf_func_proto bpf_loop_proto;
 extern const struct bpf_func_proto bpf_copy_from_user_task_proto;
 extern const struct bpf_func_proto bpf_set_retval_proto;
 extern const struct bpf_func_proto bpf_get_retval_proto;
+extern const struct bpf_func_proto bpf_user_ringbuf_drain_proto;
 
 const struct bpf_func_proto *tracing_prog_func_proto(
   enum bpf_func_id func_id, const struct bpf_prog *prog);
@@ -2554,7 +2556,7 @@  enum bpf_dynptr_type {
 	BPF_DYNPTR_TYPE_INVALID,
 	/* Points to memory that is local to the bpf program */
 	BPF_DYNPTR_TYPE_LOCAL,
-	/* Underlying data is a ringbuf record */
+	/* Underlying data is a kernel-produced ringbuf record */
 	BPF_DYNPTR_TYPE_RINGBUF,
 };
 
diff --git a/include/uapi/linux/bpf.h b/include/uapi/linux/bpf.h
index a341f877b230..ca125648d7fd 100644
--- a/include/uapi/linux/bpf.h
+++ b/include/uapi/linux/bpf.h
@@ -5332,6 +5332,13 @@  union bpf_attr {
  *		**-EACCES** if the SYN cookie is not valid.
  *
  *		**-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
+ *
+ * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
+ *	Description
+ *		Drain samples from the specified user ringbuffer, and invoke the
+ *		provided callback for each such sample.
+ *	Return
+ *		An error if a sample could not be drained.
  */
 #define __BPF_FUNC_MAPPER(FN)		\
 	FN(unspec),			\
@@ -5542,6 +5549,7 @@  union bpf_attr {
 	FN(tcp_raw_gen_syncookie_ipv6),	\
 	FN(tcp_raw_check_syncookie_ipv4),	\
 	FN(tcp_raw_check_syncookie_ipv6),	\
+	FN(user_ringbuf_drain),	        \
 	/* */
 
 /* integer value in 'imm' field of BPF_CALL instruction selects which helper
diff --git a/kernel/bpf/helpers.c b/kernel/bpf/helpers.c
index 1f961f9982d2..b8097876014c 100644
--- a/kernel/bpf/helpers.c
+++ b/kernel/bpf/helpers.c
@@ -1647,6 +1647,8 @@  bpf_base_func_proto(enum bpf_func_id func_id)
 		return &bpf_dynptr_write_proto;
 	case BPF_FUNC_dynptr_data:
 		return &bpf_dynptr_data_proto;
+	case BPF_FUNC_user_ringbuf_drain:
+		return &bpf_user_ringbuf_drain_proto;
 	default:
 		break;
 	}
diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index 29e2de42df15..fc589fc8eb7c 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -291,16 +291,33 @@  static unsigned long ringbuf_avail_data_sz(struct bpf_ringbuf *rb)
 }
 
 static __poll_t ringbuf_map_poll(struct bpf_map *map, struct file *filp,
-				 struct poll_table_struct *pts)
+				 struct poll_table_struct *pts,
+				 bool kernel_producer)
 {
 	struct bpf_ringbuf_map *rb_map;
+	bool buffer_empty;
 
 	rb_map = container_of(map, struct bpf_ringbuf_map, map);
 	poll_wait(filp, &rb_map->rb->waitq, pts);
 
-	if (ringbuf_avail_data_sz(rb_map->rb))
-		return EPOLLIN | EPOLLRDNORM;
-	return 0;
+	buffer_empty = !ringbuf_avail_data_sz(rb_map->rb);
+
+	if (kernel_producer)
+		return buffer_empty ? 0 : EPOLLIN | EPOLLRDNORM;
+	else
+		return buffer_empty ? EPOLLOUT | EPOLLWRNORM : 0;
+}
+
+static __poll_t ringbuf_map_poll_kern(struct bpf_map *map, struct file *filp,
+				      struct poll_table_struct *pts)
+{
+	return ringbuf_map_poll(map, filp, pts, true);
+}
+
+static __poll_t ringbuf_map_poll_user(struct bpf_map *map, struct file *filp,
+				      struct poll_table_struct *pts)
+{
+	return ringbuf_map_poll(map, filp, pts, false);
 }
 
 BTF_ID_LIST_SINGLE(ringbuf_map_btf_ids, struct, bpf_ringbuf_map)
@@ -309,7 +326,7 @@  const struct bpf_map_ops ringbuf_map_ops = {
 	.map_alloc = ringbuf_map_alloc,
 	.map_free = ringbuf_map_free,
 	.map_mmap = ringbuf_map_mmap_kern,
-	.map_poll = ringbuf_map_poll,
+	.map_poll = ringbuf_map_poll_kern,
 	.map_lookup_elem = ringbuf_map_lookup_elem,
 	.map_update_elem = ringbuf_map_update_elem,
 	.map_delete_elem = ringbuf_map_delete_elem,
@@ -323,6 +340,7 @@  const struct bpf_map_ops user_ringbuf_map_ops = {
 	.map_alloc = ringbuf_map_alloc,
 	.map_free = ringbuf_map_free,
 	.map_mmap = ringbuf_map_mmap_user,
+	.map_poll = ringbuf_map_poll_user,
 	.map_lookup_elem = ringbuf_map_lookup_elem,
 	.map_update_elem = ringbuf_map_update_elem,
 	.map_delete_elem = ringbuf_map_delete_elem,
@@ -605,3 +623,132 @@  const struct bpf_func_proto bpf_ringbuf_discard_dynptr_proto = {
 	.arg1_type	= ARG_PTR_TO_DYNPTR | DYNPTR_TYPE_RINGBUF | OBJ_RELEASE,
 	.arg2_type	= ARG_ANYTHING,
 };
+
+static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
+				   u32 *size)
+{
+	unsigned long cons_pos, prod_pos;
+	u32 sample_len, total_len;
+	u32 *hdr;
+	int err;
+	int busy = 0;
+
+	/* If another consumer is already consuming a sample, wait for them to
+	 * finish.
+	 */
+	if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
+		return -EBUSY;
+
+	/* Synchronizes with smp_store_release() in user-space. */
+	prod_pos = smp_load_acquire(&rb->producer_pos);
+	/* Synchronizes with smp_store_release() in
+	 * __bpf_user_ringbuf_sample_release().
+	 */
+	cons_pos = smp_load_acquire(&rb->consumer_pos);
+	if (cons_pos >= prod_pos) {
+		atomic_set(&rb->busy, 0);
+		return -ENODATA;
+	}
+
+	hdr = (u32 *)((uintptr_t)rb->data + (cons_pos & rb->mask));
+	sample_len = *hdr;
+
+	/* Check that the sample can fit into a dynptr. */
+	err = bpf_dynptr_check_size(sample_len);
+	if (err) {
+		atomic_set(&rb->busy, 0);
+		return err;
+	}
+
+	/* Check that the sample fits within the region advertised by the
+	 * consumer position.
+	 */
+	total_len = sample_len + BPF_RINGBUF_HDR_SZ;
+	if (total_len > prod_pos - cons_pos) {
+		atomic_set(&rb->busy, 0);
+		return -E2BIG;
+	}
+
+	/* Check that the sample fits within the data region of the ring buffer.
+	 */
+	if (total_len > rb->mask + 1) {
+		atomic_set(&rb->busy, 0);
+		return -E2BIG;
+	}
+
+	/* consumer_pos is updated when the sample is released.
+	 */
+
+	*sample = (void *)((uintptr_t)rb->data +
+			   ((cons_pos + BPF_RINGBUF_HDR_SZ) & rb->mask));
+	*size = sample_len;
+
+	return 0;
+}
+
+static void
+__bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
+				  u64 flags)
+{
+
+
+	/* To release the ringbuffer, just increment the producer position to
+	 * signal that a new sample can be consumed. The busy bit is cleared by
+	 * userspace when posting a new sample to the ringbuffer.
+	 */
+	smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
+			  BPF_RINGBUF_HDR_SZ);
+
+	if (flags & BPF_RB_FORCE_WAKEUP || !(flags & BPF_RB_NO_WAKEUP))
+		irq_work_queue(&rb->work);
+
+	atomic_set(&rb->busy, 0);
+}
+
+BPF_CALL_4(bpf_user_ringbuf_drain, struct bpf_map *, map,
+	   void *, callback_fn, void *, callback_ctx, u64, flags)
+{
+	struct bpf_ringbuf *rb;
+	long num_samples = 0, ret = 0;
+	int err;
+	bpf_callback_t callback = (bpf_callback_t)callback_fn;
+	u64 wakeup_flags = BPF_RB_NO_WAKEUP | BPF_RB_FORCE_WAKEUP;
+
+	if (unlikely(flags & ~wakeup_flags))
+		return -EINVAL;
+
+	/* The two wakeup flags are mutually exclusive. */
+	if (unlikely((flags & wakeup_flags) == wakeup_flags))
+		return -EINVAL;
+
+	rb = container_of(map, struct bpf_ringbuf_map, map)->rb;
+	do {
+		u32 size;
+		void *sample;
+
+		err = __bpf_user_ringbuf_poll(rb, &sample, &size);
+
+		if (!err) {
+			struct bpf_dynptr_kern dynptr;
+
+			bpf_dynptr_init(&dynptr, sample, BPF_DYNPTR_TYPE_LOCAL,
+					0, size);
+			ret = callback((u64)&dynptr,
+					(u64)(uintptr_t)callback_ctx, 0, 0, 0);
+
+			__bpf_user_ringbuf_sample_release(rb, size, flags);
+			num_samples++;
+		}
+	} while (err == 0 && num_samples < 4096 && ret == 0);
+
+	return num_samples;
+}
+
+const struct bpf_func_proto bpf_user_ringbuf_drain_proto = {
+	.func		= bpf_user_ringbuf_drain,
+	.ret_type	= RET_INTEGER,
+	.arg1_type	= ARG_CONST_MAP_PTR,
+	.arg2_type	= ARG_PTR_TO_FUNC,
+	.arg3_type	= ARG_PTR_TO_STACK_OR_NULL,
+	.arg4_type	= ARG_ANYTHING,
+};
diff --git a/kernel/bpf/verifier.c b/kernel/bpf/verifier.c
index 4a9562c62af0..99dfdc3ed187 100644
--- a/kernel/bpf/verifier.c
+++ b/kernel/bpf/verifier.c
@@ -555,6 +555,7 @@  static const char *reg_type_str(struct bpf_verifier_env *env,
 		[PTR_TO_BUF]		= "buf",
 		[PTR_TO_FUNC]		= "func",
 		[PTR_TO_MAP_KEY]	= "map_key",
+		[PTR_TO_DYNPTR]		= "dynptr_ptr",
 	};
 
 	if (type & PTR_MAYBE_NULL) {
@@ -5656,6 +5657,12 @@  static const struct bpf_reg_types stack_ptr_types = { .types = { PTR_TO_STACK }
 static const struct bpf_reg_types const_str_ptr_types = { .types = { PTR_TO_MAP_VALUE } };
 static const struct bpf_reg_types timer_types = { .types = { PTR_TO_MAP_VALUE } };
 static const struct bpf_reg_types kptr_types = { .types = { PTR_TO_MAP_VALUE } };
+static const struct bpf_reg_types dynptr_types = {
+	.types = {
+		PTR_TO_STACK,
+		PTR_TO_DYNPTR | MEM_ALLOC | DYNPTR_TYPE_LOCAL,
+	}
+};
 
 static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
 	[ARG_PTR_TO_MAP_KEY]		= &map_key_value_types,
@@ -5682,7 +5689,7 @@  static const struct bpf_reg_types *compatible_reg_types[__BPF_ARG_TYPE_MAX] = {
 	[ARG_PTR_TO_CONST_STR]		= &const_str_ptr_types,
 	[ARG_PTR_TO_TIMER]		= &timer_types,
 	[ARG_PTR_TO_KPTR]		= &kptr_types,
-	[ARG_PTR_TO_DYNPTR]		= &stack_ptr_types,
+	[ARG_PTR_TO_DYNPTR]		= &dynptr_types,
 };
 
 static int check_reg_type(struct bpf_verifier_env *env, u32 regno,
@@ -6025,6 +6032,13 @@  static int check_func_arg(struct bpf_verifier_env *env, u32 arg,
 		err = check_mem_size_reg(env, reg, regno, true, meta);
 		break;
 	case ARG_PTR_TO_DYNPTR:
+		/* We only need to check for initialized / uninitialized helper
+		 * dynptr args if the dynptr is not MEM_ALLOC, as the assumption
+		 * is that if it is, that a helper function initialized the
+		 * dynptr on behalf of the BPF program.
+		 */
+		if (reg->type & MEM_ALLOC)
+			break;
 		if (arg_type & MEM_UNINIT) {
 			if (!is_dynptr_reg_valid_uninit(env, reg)) {
 				verbose(env, "Dynptr has to be an uninitialized dynptr\n");
@@ -6197,7 +6211,9 @@  static int check_map_func_compatibility(struct bpf_verifier_env *env,
 			goto error;
 		break;
 	case BPF_MAP_TYPE_USER_RINGBUF:
-		goto error;
+		if (func_id != BPF_FUNC_user_ringbuf_drain)
+			goto error;
+		break;
 	case BPF_MAP_TYPE_STACK_TRACE:
 		if (func_id != BPF_FUNC_get_stackid)
 			goto error;
@@ -6317,6 +6333,10 @@  static int check_map_func_compatibility(struct bpf_verifier_env *env,
 		if (map->map_type != BPF_MAP_TYPE_RINGBUF)
 			goto error;
 		break;
+	case BPF_FUNC_user_ringbuf_drain:
+		if (map->map_type != BPF_MAP_TYPE_USER_RINGBUF)
+			goto error;
+		break;
 	case BPF_FUNC_get_stackid:
 		if (map->map_type != BPF_MAP_TYPE_STACK_TRACE)
 			goto error;
@@ -6901,6 +6921,29 @@  static int set_find_vma_callback_state(struct bpf_verifier_env *env,
 	return 0;
 }
 
+static int set_user_ringbuf_callback_state(struct bpf_verifier_env *env,
+					   struct bpf_func_state *caller,
+					   struct bpf_func_state *callee,
+					   int insn_idx)
+{
+	/* bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void
+	 *			  callback_ctx, u64 flags);
+	 * callback_fn(struct bpf_dynptr_t* dynptr, void *callback_ctx);
+	 */
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_0]);
+	callee->regs[BPF_REG_1].type = PTR_TO_DYNPTR | DYNPTR_TYPE_LOCAL | MEM_ALLOC;
+	__mark_reg_known_zero(&callee->regs[BPF_REG_1]);
+	callee->regs[BPF_REG_2] = caller->regs[BPF_REG_3];
+
+	/* unused */
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_3]);
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_4]);
+	__mark_reg_not_init(env, &callee->regs[BPF_REG_5]);
+
+	callee->in_callback_fn = true;
+	return 0;
+}
+
 static int prepare_func_exit(struct bpf_verifier_env *env, int *insn_idx)
 {
 	struct bpf_verifier_state *state = env->cur_state;
@@ -7345,6 +7388,10 @@  static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
 			}
 		}
 		break;
+	case BPF_FUNC_user_ringbuf_drain:
+		err = __check_func_call(env, insn, insn_idx_p, meta.subprogno,
+					set_user_ringbuf_callback_state);
+		break;
 	}
 
 	if (err)
@@ -7477,11 +7524,15 @@  static int check_helper_call(struct bpf_verifier_env *env, struct bpf_insn *insn
 		/* Find the id of the dynptr we're acquiring a reference to */
 		for (i = 0; i < MAX_BPF_FUNC_REG_ARGS; i++) {
 			if (arg_type_is_dynptr(fn->arg_type[i])) {
+				struct bpf_reg_state *reg = &regs[BPF_REG_1 + i];
+
 				if (dynptr_id) {
 					verbose(env, "verifier internal error: multiple dynptr args in func\n");
 					return -EFAULT;
 				}
-				dynptr_id = stack_slot_get_id(env, &regs[BPF_REG_1 + i]);
+
+				if (!(reg->type & MEM_ALLOC))
+					dynptr_id = stack_slot_get_id(env, reg);
 			}
 		}
 		/* For release_reference() */
diff --git a/tools/include/uapi/linux/bpf.h b/tools/include/uapi/linux/bpf.h
index ce0ce713faf9..e5e4ab12177c 100644
--- a/tools/include/uapi/linux/bpf.h
+++ b/tools/include/uapi/linux/bpf.h
@@ -5332,6 +5332,13 @@  union bpf_attr {
  *		**-EACCES** if the SYN cookie is not valid.
  *
  *		**-EPROTONOSUPPORT** if CONFIG_IPV6 is not builtin.
+ *
+ * long bpf_user_ringbuf_drain(struct bpf_map *map, void *callback_fn, void *ctx, u64 flags)
+ *	Description
+ *		Drain samples from the specified user ringbuffer, and invoke the
+ *		provided callback for each such sample.
+ *	Return
+ *		An error if a sample could not be drained.
  */
 #define __BPF_FUNC_MAPPER(FN)		\
 	FN(unspec),			\
@@ -5542,6 +5549,7 @@  union bpf_attr {
 	FN(tcp_raw_gen_syncookie_ipv6),	\
 	FN(tcp_raw_check_syncookie_ipv4),	\
 	FN(tcp_raw_check_syncookie_ipv6),	\
+	FN(bpf_user_ringbuf_drain),	\
 	/* */
 
 /* integer value in 'imm' field of BPF_CALL instruction selects which helper