diff mbox series

[4/5] bpf: Add libbpf logic for user-space ring buffer

Message ID 20220808155341.2479054-4-void@manifault.com (mailing list archive)
State Superseded
Delegated to: BPF
Headers show
Series bpf: Add user-space-publisher ringbuffer map type | expand

Checks

Context Check Description
netdev/tree_selection success Guessed tree name to be net-next, async
netdev/fixes_present success Fixes tag not required for -next series
netdev/subject_prefix success Link
netdev/cover_letter success Series has a cover letter
netdev/patch_count success Link
netdev/header_inline success No static functions without inline keyword in header files
netdev/build_32bit success Errors and warnings before: 6 this patch: 6
netdev/cc_maintainers success CCed 12 of 12 maintainers
netdev/build_clang success Errors and warnings before: 7 this patch: 7
netdev/module_param success Was 0 now: 0
netdev/verify_signedoff success Signed-off-by tag matches author and committer
netdev/check_selftest success No net selftest shell script
netdev/verify_fixes success No Fixes tag
netdev/build_allmodconfig_warn success Errors and warnings before: 2 this patch: 2
netdev/checkpatch warning CHECK: Blank lines aren't necessary after an open brace '{'
netdev/kdoc success Errors and warnings before: 76 this patch: 76
netdev/source_inline success Was 0 now: 0
bpf/vmtest-bpf-next-PR fail PR summary
bpf/vmtest-bpf-next-VM_Test-1 fail Logs for Kernel LATEST on ubuntu-latest with gcc
bpf/vmtest-bpf-next-VM_Test-2 fail Logs for Kernel LATEST on ubuntu-latest with llvm-16
bpf/vmtest-bpf-next-VM_Test-3 fail Logs for Kernel LATEST on z15 with gcc

Commit Message

David Vernet Aug. 8, 2022, 3:53 p.m. UTC
Now that all of the logic is in place in the kernel to support user-space
produced ringbuffers, we can add the user-space logic to libbpf.

Signed-off-by: David Vernet <void@manifault.com>
---
 kernel/bpf/ringbuf.c          |   7 +-
 tools/lib/bpf/libbpf.c        |  10 +-
 tools/lib/bpf/libbpf.h        |  19 +++
 tools/lib/bpf/libbpf.map      |   6 +
 tools/lib/bpf/libbpf_probes.c |   1 +
 tools/lib/bpf/ringbuf.c       | 216 ++++++++++++++++++++++++++++++++++
 6 files changed, 256 insertions(+), 3 deletions(-)

Comments

Andrii Nakryiko Aug. 11, 2022, 11:39 p.m. UTC | #1
On Mon, Aug 8, 2022 at 8:54 AM David Vernet <void@manifault.com> wrote:
>
> Now that all of the logic is in place in the kernel to support user-space
> produced ringbuffers, we can add the user-space logic to libbpf.
>
> Signed-off-by: David Vernet <void@manifault.com>
> ---
>  kernel/bpf/ringbuf.c          |   7 +-
>  tools/lib/bpf/libbpf.c        |  10 +-
>  tools/lib/bpf/libbpf.h        |  19 +++
>  tools/lib/bpf/libbpf.map      |   6 +
>  tools/lib/bpf/libbpf_probes.c |   1 +
>  tools/lib/bpf/ringbuf.c       | 216 ++++++++++++++++++++++++++++++++++
>  6 files changed, 256 insertions(+), 3 deletions(-)
>
> diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> index fc589fc8eb7c..a10558e79ec8 100644
> --- a/kernel/bpf/ringbuf.c
> +++ b/kernel/bpf/ringbuf.c
> @@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
>         if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
>                 return -EBUSY;
>
> -       /* Synchronizes with smp_store_release() in user-space. */
> +       /* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
> +        * in user-space.
> +        */

let's not hard-code libbpf function names in kernel comments, it's
still user-space

>         prod_pos = smp_load_acquire(&rb->producer_pos);
>         /* Synchronizes with smp_store_release() in
>          * __bpf_user_ringbuf_sample_release().
> @@ -695,6 +697,9 @@ __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
>         /* To release the ringbuffer, just increment the producer position to
>          * signal that a new sample can be consumed. The busy bit is cleared by
>          * userspace when posting a new sample to the ringbuffer.
> +        *
> +        * Synchronizes with smp_load_acquire() in ring_buffer_user__reserve()
> +        * in user-space.
>          */

same

>         smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
>                           BPF_RINGBUF_HDR_SZ);
> diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c
> index 9c1f2d09f44e..f7fe09dce643 100644
> --- a/tools/lib/bpf/libbpf.c
> +++ b/tools/lib/bpf/libbpf.c
> @@ -2367,6 +2367,12 @@ static size_t adjust_ringbuf_sz(size_t sz)
>         return sz;
>  }
>
> +static bool map_is_ringbuf(const struct bpf_map *map)
> +{
> +       return map->def.type == BPF_MAP_TYPE_RINGBUF ||
> +              map->def.type == BPF_MAP_TYPE_USER_RINGBUF;
> +}
> +
>  static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def)
>  {
>         map->def.type = def->map_type;
> @@ -2381,7 +2387,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def
>         map->btf_value_type_id = def->value_type_id;
>
>         /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
> -       if (map->def.type == BPF_MAP_TYPE_RINGBUF)
> +       if (map_is_ringbuf(map))
>                 map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
>
>         if (def->parts & MAP_DEF_MAP_TYPE)
> @@ -4363,7 +4369,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries)
>         map->def.max_entries = max_entries;
>
>         /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
> -       if (map->def.type == BPF_MAP_TYPE_RINGBUF)
> +       if (map_is_ringbuf(map))
>                 map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
>
>         return 0;
> diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
> index 61493c4cddac..6d1d0539b08d 100644
> --- a/tools/lib/bpf/libbpf.h
> +++ b/tools/lib/bpf/libbpf.h
> @@ -1009,6 +1009,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
>
>  /* Ring buffer APIs */
>  struct ring_buffer;
> +struct ring_buffer_user;


I know that I'm the one asking to use ring_buffer_user name, but given
kernel is using USER_RINGBUF and user_ringbuf naming, let's be
consistent and use user_ring_buffer in libbpf as well. I was
contemplating uring_buffer, can't make up my mind if it's better or
not.

Also ring_buffer_user reads like "user of ring buffer", which adds to
confusion :)


>
>  typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
>
> @@ -1028,6 +1029,24 @@ LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
>  LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
>  LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
>
> +struct ring_buffer_user_opts {
> +       size_t sz; /* size of this struct, for forward/backward compatibility */
> +};
> +
> +#define ring_buffer_user_opts__last_field sz
> +
> +LIBBPF_API struct ring_buffer_user *
> +ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts);
> +LIBBPF_API void *ring_buffer_user__reserve(struct ring_buffer_user *rb,
> +                                          uint32_t size);
> +LIBBPF_API void *ring_buffer_user__poll(struct ring_buffer_user *rb,
> +                                       uint32_t size, int timeout_ms);
> +LIBBPF_API void ring_buffer_user__submit(struct ring_buffer_user *rb,
> +                                        void *sample);
> +LIBBPF_API void ring_buffer_user__discard(struct ring_buffer_user *rb,
> +                                         void *sample);
> +LIBBPF_API void ring_buffer_user__free(struct ring_buffer_user *rb);
> +
>  /* Perf buffer APIs */
>  struct perf_buffer;
>
> diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map
> index 119e6e1ea7f1..8db11040df1b 100644
> --- a/tools/lib/bpf/libbpf.map
> +++ b/tools/lib/bpf/libbpf.map
> @@ -365,4 +365,10 @@ LIBBPF_1.0.0 {
>                 libbpf_bpf_map_type_str;
>                 libbpf_bpf_prog_type_str;
>                 perf_buffer__buffer;
> +               ring_buffer_user__discard;
> +               ring_buffer_user__free;
> +               ring_buffer_user__new;
> +               ring_buffer_user__poll;
> +               ring_buffer_user__reserve;
> +               ring_buffer_user__submit;
>  };
> diff --git a/tools/lib/bpf/libbpf_probes.c b/tools/lib/bpf/libbpf_probes.c
> index 6d495656f554..f3a8e8e74eb8 100644
> --- a/tools/lib/bpf/libbpf_probes.c
> +++ b/tools/lib/bpf/libbpf_probes.c
> @@ -231,6 +231,7 @@ static int probe_map_create(enum bpf_map_type map_type)
>                         return btf_fd;
>                 break;
>         case BPF_MAP_TYPE_RINGBUF:
> +       case BPF_MAP_TYPE_USER_RINGBUF:
>                 key_size = 0;
>                 value_size = 0;
>                 max_entries = 4096;
> diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
> index 8bc117bcc7bc..86e3c11d8486 100644
> --- a/tools/lib/bpf/ringbuf.c
> +++ b/tools/lib/bpf/ringbuf.c
> @@ -39,6 +39,17 @@ struct ring_buffer {
>         int ring_cnt;
>  };
>
> +struct ring_buffer_user {
> +       struct epoll_event event;
> +       unsigned long *consumer_pos;
> +       unsigned long *producer_pos;
> +       void *data;
> +       unsigned long mask;
> +       size_t page_size;
> +       int map_fd;
> +       int epoll_fd;
> +};
> +
>  static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
>  {
>         if (r->consumer_pos) {
> @@ -300,3 +311,208 @@ int ring_buffer__epoll_fd(const struct ring_buffer *rb)
>  {
>         return rb->epoll_fd;
>  }
> +
> +static void __user_ringbuf_unmap_ring(struct ring_buffer_user *rb)

libbpf generally doesn't use double underscore naming pattern, let's
not do that here as well

> +{
> +       if (rb->consumer_pos) {
> +               munmap(rb->consumer_pos, rb->page_size);
> +               rb->consumer_pos = NULL;
> +       }
> +       if (rb->producer_pos) {
> +               munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1));
> +               rb->producer_pos = NULL;
> +       }
> +}
> +
> +void ring_buffer_user__free(struct ring_buffer_user *rb)
> +{
> +       if (!rb)
> +               return;
> +
> +       __user_ringbuf_unmap_ring(rb);
> +
> +       if (rb->epoll_fd >= 0)
> +               close(rb->epoll_fd);
> +
> +       free(rb);
> +}
> +
> +static int __ring_buffer_user_map(struct ring_buffer_user *rb, int map_fd)
> +{
> +

please don't leave stray empty lines around the code


> +       struct bpf_map_info info;
> +       __u32 len = sizeof(info);
> +       void *tmp;
> +       struct epoll_event *rb_epoll;
> +       int err;
> +
> +       memset(&info, 0, sizeof(info));
> +
> +       err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
> +       if (err) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n",
> +                       map_fd, err);
> +               return libbpf_err(err);
> +       }
> +
> +       if (info.type != BPF_MAP_TYPE_USER_RINGBUF) {
> +               pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n",
> +                       map_fd);
> +               return libbpf_err(-EINVAL);
> +       }
> +
> +       rb->map_fd = map_fd;
> +       rb->mask = info.max_entries - 1;
> +
> +       /* Map read-only consumer page */
> +       tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0);
> +       if (tmp == MAP_FAILED) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
> +                       map_fd, err);
> +               return libbpf_err(err);
> +       }
> +       rb->consumer_pos = tmp;
> +
> +       /* Map read-write the producer page and data pages. We map the data
> +        * region as twice the total size of the ringbuffer to allow the simple
> +        * reading and writing of samples that wrap around the end of the
> +        * buffer.  See the kernel implementation for details.
> +        */
> +       tmp = mmap(NULL, rb->page_size + 2 * info.max_entries,
> +                  PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size);
> +       if (tmp == MAP_FAILED) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n",
> +                       map_fd, err);
> +               return libbpf_err(err);
> +       }
> +
> +       rb->producer_pos = tmp;
> +       rb->data = tmp + rb->page_size;
> +
> +       rb_epoll = &rb->event;
> +       rb_epoll->events = EPOLLOUT;
> +       if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n",
> +                       map_fd, err);
> +               return libbpf_err(err);
> +       }
> +
> +       return 0;
> +}
> +
> +struct ring_buffer_user *
> +ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts)
> +{
> +       struct ring_buffer_user *rb;
> +       int err;
> +
> +       if (!OPTS_VALID(opts, ring_buffer_opts))
> +               return errno = EINVAL, NULL;
> +
> +       rb = calloc(1, sizeof(*rb));
> +       if (!rb)
> +               return errno = ENOMEM, NULL;
> +
> +       rb->page_size = getpagesize();
> +
> +       rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
> +       if (rb->epoll_fd < 0) {
> +               err = -errno;
> +               pr_warn("user ringbuf: failed to create epoll instance: %d\n",
> +                       err);
> +               goto err_out;
> +       }
> +
> +       err = __ring_buffer_user_map(rb, map_fd);
> +       if (err)
> +               goto err_out;
> +
> +       return rb;
> +
> +err_out:
> +       ring_buffer_user__free(rb);
> +       return errno = -err, NULL;
> +}
> +
> +static void __ring_buffer_user__commit(struct ring_buffer_user *rb)
> +{
> +       uint32_t *hdr;
> +       uint32_t total_len;
> +       unsigned long prod_pos;
> +
> +       prod_pos = *rb->producer_pos;
> +       hdr = rb->data + (prod_pos & rb->mask);
> +
> +       total_len = *hdr + BPF_RINGBUF_HDR_SZ;

round up to multiple of 8

> +
> +       /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_poll() in
> +        * the kernel.
> +        */
> +       smp_store_release(rb->producer_pos, prod_pos + total_len);
> +}
> +
> +/* Discard a previously reserved sample into the ring buffer. Because the user
> + * ringbuffer is assumed to be single producer, this can simply be a no-op, and
> + * the producer pointer is left in the same place as when it was reserved.
> + */
> +void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample)
> +{}

{
}

> +
> +/* Submit a previously reserved sample into the ring buffer.
> + */

nit: this is single-line comment


> +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample)
> +{
> +       __ring_buffer_user__commit(rb);
> +}

this made me think that it's probably best to add kernel support for
busy bit anyways (just like for existing ringbuf), so that we can
eventually turn this into multi-producer on user-space side (all we
need is a lock, really). So let's anticipate that on kernel side from
the very beginning

> +
> +/* Reserve a pointer to a sample in the user ring buffer. This function is *not*
> + * thread safe, and the ring-buffer supports only a single producer.
> + */
> +void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size)
> +{
> +       uint32_t *hdr;
> +       /* 64-bit to avoid overflow in case of extreme application behavior */
> +       size_t avail_size, total_size, max_size;

size_t is not guaranteed to be 64-bit, neither is long

> +       unsigned long cons_pos, prod_pos;
> +
> +       /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_poll() in
> +        * the kernel.
> +        */
> +       cons_pos = smp_load_acquire(rb->consumer_pos);
> +       /* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
> +        */
> +       prod_pos = smp_load_acquire(rb->producer_pos);
> +
> +       max_size = rb->mask + 1;
> +       avail_size = max_size - (prod_pos - cons_pos);
> +       total_size = size + BPF_RINGBUF_HDR_SZ;

round_up(8)

> +
> +       if (total_size > max_size || avail_size < total_size)
> +               return NULL;

set errno as well?

> +
> +       hdr = rb->data + (prod_pos & rb->mask);
> +       *hdr = size;

I'd make sure that entire 8 bytes of header are zero-initialized, for
better future extensibility

> +
> +       /* Producer pos is updated when a sample is submitted. */
> +
> +       return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
> +}
> +
> +/* Poll for available space in the ringbuffer, and reserve a record when it
> + * becomes available.
> + */
> +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
> +                            int timeout_ms)
> +{
> +       int cnt;
> +
> +       cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
> +       if (cnt < 0)
> +               return NULL;
> +
> +       return ring_buffer_user__reserve(rb, size);

it's not clear how just doing epoll_wait() guarantees that we have >=
size of space available?.. Seems like some tests are missing?

> +}
> --
> 2.30.2
>
David Vernet Aug. 12, 2022, 5:28 p.m. UTC | #2
On Thu, Aug 11, 2022 at 04:39:57PM -0700, Andrii Nakryiko wrote:

[...]

> > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> > index fc589fc8eb7c..a10558e79ec8 100644
> > --- a/kernel/bpf/ringbuf.c
> > +++ b/kernel/bpf/ringbuf.c
> > @@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> >         if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> >                 return -EBUSY;
> >
> > -       /* Synchronizes with smp_store_release() in user-space. */
> > +       /* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
> > +        * in user-space.
> > +        */
> 
> let's not hard-code libbpf function names in kernel comments, it's
> still user-space

Fair enough.

> >         prod_pos = smp_load_acquire(&rb->producer_pos);
> >         /* Synchronizes with smp_store_release() in
> >          * __bpf_user_ringbuf_sample_release().
> > @@ -695,6 +697,9 @@ __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
> >         /* To release the ringbuffer, just increment the producer position to
> >          * signal that a new sample can be consumed. The busy bit is cleared by
> >          * userspace when posting a new sample to the ringbuffer.
> > +        *
> > +        * Synchronizes with smp_load_acquire() in ring_buffer_user__reserve()
> > +        * in user-space.
> >          */
> 
> same
> 
> >         smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
> >                           BPF_RINGBUF_HDR_SZ);
> > diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c
> > index 9c1f2d09f44e..f7fe09dce643 100644
> > --- a/tools/lib/bpf/libbpf.c
> > +++ b/tools/lib/bpf/libbpf.c
> > @@ -2367,6 +2367,12 @@ static size_t adjust_ringbuf_sz(size_t sz)
> >         return sz;
> >  }
> >
> > +static bool map_is_ringbuf(const struct bpf_map *map)
> > +{
> > +       return map->def.type == BPF_MAP_TYPE_RINGBUF ||
> > +              map->def.type == BPF_MAP_TYPE_USER_RINGBUF;
> > +}
> > +
> >  static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def)
> >  {
> >         map->def.type = def->map_type;
> > @@ -2381,7 +2387,7 @@ static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def
> >         map->btf_value_type_id = def->value_type_id;
> >
> >         /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
> > -       if (map->def.type == BPF_MAP_TYPE_RINGBUF)
> > +       if (map_is_ringbuf(map))
> >                 map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
> >
> >         if (def->parts & MAP_DEF_MAP_TYPE)
> > @@ -4363,7 +4369,7 @@ int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries)
> >         map->def.max_entries = max_entries;
> >
> >         /* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
> > -       if (map->def.type == BPF_MAP_TYPE_RINGBUF)
> > +       if (map_is_ringbuf(map))
> >                 map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
> >
> >         return 0;
> > diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
> > index 61493c4cddac..6d1d0539b08d 100644
> > --- a/tools/lib/bpf/libbpf.h
> > +++ b/tools/lib/bpf/libbpf.h
> > @@ -1009,6 +1009,7 @@ LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
> >
> >  /* Ring buffer APIs */
> >  struct ring_buffer;
> > +struct ring_buffer_user;
> 
> 
> I know that I'm the one asking to use ring_buffer_user name, but given
> kernel is using USER_RINGBUF and user_ringbuf naming, let's be
> consistent and use user_ring_buffer in libbpf as well. I was
> contemplating uring_buffer, can't make up my mind if it's better or
> not.

Not a problem, I'll revert it back to user_ring_buffer.

> Also ring_buffer_user reads like "user of ring buffer", which adds to
> confusion :)

[...]

> > +static void __user_ringbuf_unmap_ring(struct ring_buffer_user *rb)
> 
> libbpf generally doesn't use double underscore naming pattern, let's
> not do that here as well

Ack.

> > +{
> > +       if (rb->consumer_pos) {
> > +               munmap(rb->consumer_pos, rb->page_size);
> > +               rb->consumer_pos = NULL;
> > +       }
> > +       if (rb->producer_pos) {
> > +               munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1));
> > +               rb->producer_pos = NULL;
> > +       }
> > +}
> > +
> > +void ring_buffer_user__free(struct ring_buffer_user *rb)
> > +{
> > +       if (!rb)
> > +               return;
> > +
> > +       __user_ringbuf_unmap_ring(rb);
> > +
> > +       if (rb->epoll_fd >= 0)
> > +               close(rb->epoll_fd);
> > +
> > +       free(rb);
> > +}
> > +
> > +static int __ring_buffer_user_map(struct ring_buffer_user *rb, int map_fd)
> > +{
> > +
> 
> please don't leave stray empty lines around the code

Sorry, not sure how I missed that.

> > +static void __ring_buffer_user__commit(struct ring_buffer_user *rb)
> > +{
> > +       uint32_t *hdr;
> > +       uint32_t total_len;
> > +       unsigned long prod_pos;
> > +
> > +       prod_pos = *rb->producer_pos;
> > +       hdr = rb->data + (prod_pos & rb->mask);
> > +
> > +       total_len = *hdr + BPF_RINGBUF_HDR_SZ;
> 
> round up to multiple of 8

Will do, and I'll also validate this in the kernel.

> > +
> > +       /* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_poll() in
> > +        * the kernel.
> > +        */
> > +       smp_store_release(rb->producer_pos, prod_pos + total_len);
> > +}
> > +
> > +/* Discard a previously reserved sample into the ring buffer. Because the user
> > + * ringbuffer is assumed to be single producer, this can simply be a no-op, and
> > + * the producer pointer is left in the same place as when it was reserved.
> > + */
> > +void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample)
> > +{}
> 
> {
> }

Ack.

> > +
> > +/* Submit a previously reserved sample into the ring buffer.
> > + */
> 
> nit: this is single-line comment

Ack.

> > +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample)
> > +{
> > +       __ring_buffer_user__commit(rb);
> > +}
> 
> this made me think that it's probably best to add kernel support for
> busy bit anyways (just like for existing ringbuf), so that we can
> eventually turn this into multi-producer on user-space side (all we
> need is a lock, really). So let's anticipate that on kernel side from
> the very beginning

Hmm, yeah, fair enough. We have the extra space in the sample header to OR the
busy bit, and we already have a 2-stage reserve -> commit workflow, so we might
as well. I'll go ahead and add this, and then hopefully someday we can just add
a lock, as you mentioned.

> > +/* Reserve a pointer to a sample in the user ring buffer. This function is *not*
> > + * thread safe, and the ring-buffer supports only a single producer.
> > + */
> > +void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size)
> > +{
> > +       uint32_t *hdr;
> > +       /* 64-bit to avoid overflow in case of extreme application behavior */
> > +       size_t avail_size, total_size, max_size;
> 
> size_t is not guaranteed to be 64-bit, neither is long

Sorry, you're right. I'll just use explicit bit-width types.

> > +       unsigned long cons_pos, prod_pos;
> > +
> > +       /* Synchronizes with smp_store_release() in __bpf_user_ringbuf_poll() in
> > +        * the kernel.
> > +        */
> > +       cons_pos = smp_load_acquire(rb->consumer_pos);
> > +       /* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
> > +        */
> > +       prod_pos = smp_load_acquire(rb->producer_pos);
> > +
> > +       max_size = rb->mask + 1;
> > +       avail_size = max_size - (prod_pos - cons_pos);
> > +       total_size = size + BPF_RINGBUF_HDR_SZ;
> 
> round_up(8)

Ack.

> > +
> > +       if (total_size > max_size || avail_size < total_size)
> > +               return NULL;
> 
> set errno as well?

Will do.

> > +
> > +       hdr = rb->data + (prod_pos & rb->mask);
> > +       *hdr = size;
> 
> I'd make sure that entire 8 bytes of header are zero-initialized, for
> better future extensibility

Will do.

> > +
> > +       /* Producer pos is updated when a sample is submitted. */
> > +
> > +       return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
> > +}
> > +
> > +/* Poll for available space in the ringbuffer, and reserve a record when it
> > + * becomes available.
> > + */
> > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
> > +                            int timeout_ms)
> > +{
> > +       int cnt;
> > +
> > +       cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
> > +       if (cnt < 0)
> > +               return NULL;
> > +
> > +       return ring_buffer_user__reserve(rb, size);
> 
> it's not clear how just doing epoll_wait() guarantees that we have >=
> size of space available?.. Seems like some tests are missing?

Right now, the kernel only kicks the polling writer once it's drained all
of the samples from the ring buffer. So at this point, if there's not
enough size in the buffer, there would be nothing we could do regardless.
This seemed like reasonable, simple behavior for the initial
implementation. I can make it a bit more intelligent if you'd like, and
return EPOLLRWNORM as soon as there is any space in the buffer, and have
libbpf potentially make multiple calls to epoll_wait() until enough space
has become available.
Andrii Nakryiko Aug. 16, 2022, 7:09 p.m. UTC | #3
On Fri, Aug 12, 2022 at 10:28 AM David Vernet <void@manifault.com> wrote:
>
> On Thu, Aug 11, 2022 at 04:39:57PM -0700, Andrii Nakryiko wrote:
>
> [...]
>
> > > diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
> > > index fc589fc8eb7c..a10558e79ec8 100644
> > > --- a/kernel/bpf/ringbuf.c
> > > +++ b/kernel/bpf/ringbuf.c
> > > @@ -639,7 +639,9 @@ static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
> > >         if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
> > >                 return -EBUSY;
> > >

[...]

> > > +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample)
> > > +{
> > > +       __ring_buffer_user__commit(rb);
> > > +}
> >
> > this made me think that it's probably best to add kernel support for
> > busy bit anyways (just like for existing ringbuf), so that we can
> > eventually turn this into multi-producer on user-space side (all we
> > need is a lock, really). So let's anticipate that on kernel side from
> > the very beginning
>
> Hmm, yeah, fair enough. We have the extra space in the sample header to OR the
> busy bit, and we already have a 2-stage reserve -> commit workflow, so we might
> as well. I'll go ahead and add this, and then hopefully someday we can just add
> a lock, as you mentioned.

Right. We can probably also just document that reserve() step is the
only one that needs serialization among multiple producers (and
currently is required to taken care of by user app), while commit
(submit/discard) operation is thread-safe and needs no
synchronization.

The only reason we don't add it to libbpf right now is because we are
unsure about taking explicit dependency on pthread library. But I also
just found [0], so I don't know, maybe we should use that? I wonder if
it's supported by musl and other less full-featured libc
implementations, though.

  [0] https://www.gnu.org/software/libc/manual/html_node/ISO-C-Mutexes.html

>
> > > +/* Reserve a pointer to a sample in the user ring buffer. This function is *not*
> > > + * thread safe, and the ring-buffer supports only a single producer.
> > > + */
> > > +void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size)
> > > +{
> > > +       uint32_t *hdr;
> > > +       /* 64-bit to avoid overflow in case of extreme application behavior */
> > > +       size_t avail_size, total_size, max_size;
> >
> > size_t is not guaranteed to be 64-bit, neither is long

[...]

> > > +/* Poll for available space in the ringbuffer, and reserve a record when it
> > > + * becomes available.
> > > + */
> > > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
> > > +                            int timeout_ms)
> > > +{
> > > +       int cnt;
> > > +
> > > +       cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
> > > +       if (cnt < 0)
> > > +               return NULL;
> > > +
> > > +       return ring_buffer_user__reserve(rb, size);
> >
> > it's not clear how just doing epoll_wait() guarantees that we have >=
> > size of space available?.. Seems like some tests are missing?
>
> Right now, the kernel only kicks the polling writer once it's drained all
> of the samples from the ring buffer. So at this point, if there's not
> enough size in the buffer, there would be nothing we could do regardless.
> This seemed like reasonable, simple behavior for the initial
> implementation. I can make it a bit more intelligent if you'd like, and
> return EPOLLRWNORM as soon as there is any space in the buffer, and have
> libbpf potentially make multiple calls to epoll_wait() until enough space
> has become available.

So this "drain all samples" notion is not great: you can end drain
prematurely and thus not really drain all the data in ringbuf. With
multiple producers there could also be always more data coming in in
parallel. Plus, when in the future we'll have BPF program associated
with such ringbuf on the kernel side, we won't have a notion of
draining queue, we'll be just submitting record and letting kernel
handle it eventually.

So I think yeah, you'd have to send notification when at least one
sample gets consumed. The problem is that it's going to be a
performance hit, potentially, if you are going to do this notification
for each consumed sample. BPF ringbuf gets somewhat around that by
using heuristic to avoid notification if we see that consumer is still
behind kernel when kernel submits a new sample.

I don't know if we can do anything clever here for waiting for some
space to be available...  Any thoughts?

As for making libbpf loop until enough space is available... I guess
that would be the only reasonable implementation, right? I wonder if
calling it "user_ring_buffer__reserve_blocking()" would be a better
name than just "poll", though?
David Vernet Aug. 17, 2022, 2:02 p.m. UTC | #4
On Tue, Aug 16, 2022 at 12:09:53PM -0700, Andrii Nakryiko wrote:
> > > > +void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample)
> > > > +{
> > > > +       __ring_buffer_user__commit(rb);
> > > > +}
> > >
> > > this made me think that it's probably best to add kernel support for
> > > busy bit anyways (just like for existing ringbuf), so that we can
> > > eventually turn this into multi-producer on user-space side (all we
> > > need is a lock, really). So let's anticipate that on kernel side from
> > > the very beginning
> >
> > Hmm, yeah, fair enough. We have the extra space in the sample header to OR the
> > busy bit, and we already have a 2-stage reserve -> commit workflow, so we might
> > as well. I'll go ahead and add this, and then hopefully someday we can just add
> > a lock, as you mentioned.
> 
> Right. We can probably also just document that reserve() step is the
> only one that needs serialization among multiple producers (and
> currently is required to taken care of by user app), while commit
> (submit/discard) operation is thread-safe and needs no
> synchronization.

Sounds good.

> The only reason we don't add it to libbpf right now is because we are
> unsure about taking explicit dependency on pthread library. But I also
> just found [0], so I don't know, maybe we should use that? I wonder if
> it's supported by musl and other less full-featured libc
> implementations, though.
> 
>   [0] https://www.gnu.org/software/libc/manual/html_node/ISO-C-Mutexes.html

IMHO, and others may disagree, if it's in the C standard it seems like it
should be fair game to add to libbpf? Also FWIW, it looks like musl does
support it.  See mtx_*.c in [0].

[0] https://git.musl-libc.org/cgit/musl/tree/src/thread

That being said, I would like to try and keep this decision outside the
scope of user-ringbuf though, if possible. Would you be OK this landing
as is (modulo further discussion, revisions, etc, of course), and then
we can update this implementation to be multi-producer if and when we've
added something like mtx_t support in a follow-on patch-set?

[...]

> > > > +/* Poll for available space in the ringbuffer, and reserve a record when it
> > > > + * becomes available.
> > > > + */
> > > > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
> > > > +                            int timeout_ms)
> > > > +{
> > > > +       int cnt;
> > > > +
> > > > +       cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
> > > > +       if (cnt < 0)
> > > > +               return NULL;
> > > > +
> > > > +       return ring_buffer_user__reserve(rb, size);
> > >
> > > it's not clear how just doing epoll_wait() guarantees that we have >=
> > > size of space available?.. Seems like some tests are missing?
> >
> > Right now, the kernel only kicks the polling writer once it's drained all
> > of the samples from the ring buffer. So at this point, if there's not
> > enough size in the buffer, there would be nothing we could do regardless.
> > This seemed like reasonable, simple behavior for the initial
> > implementation. I can make it a bit more intelligent if you'd like, and
> > return EPOLLRWNORM as soon as there is any space in the buffer, and have
> > libbpf potentially make multiple calls to epoll_wait() until enough space
> > has become available.
> 
> So this "drain all samples" notion is not great: you can end drain
> prematurely and thus not really drain all the data in ringbuf.With
> multiple producers there could also be always more data coming in in
> parallel. Plus, when in the future we'll have BPF program associated
> with such ringbuf on the kernel side, we won't have a notion of
> draining queue, we'll be just submitting record and letting kernel
> handle it eventually.

I don't disagree with any of your points. I think what we'll have to
decide-on is a trade-off between performance and usability. As you pointed
out, if we only kick user-space once the ringbuffer is empty, that imposes
the requirement on the kernel that it will always drain the ringbuffer.
That might not even be possible though if we have multiple producers
posting data in parallel.

More on this below, but the TL;DR is that I agree with you, and I think
having a model where we kick user-space whenever a sample is consumed from
the buffer is a lot easier to reason about, and probably our only option if
our plan is to make the ringbuffer MPMC. I'll make this change in v3.

> So I think yeah, you'd have to send notification when at least one
> sample gets consumed. The problem is that it's going to be a
> performance hit, potentially, if you are going to do this notification
> for each consumed sample. BPF ringbuf gets somewhat around that by
> using heuristic to avoid notification if we see that consumer is still
> behind kernel when kernel submits a new sample.

Something perhaps worth pointing out here is that this heuristic works
because the kernel-producer ringbuffer is MPSC. If it were MPMC, we'd
potentially have the same problem you pointed out above where you'd never
wake up an epoll-waiter because other consumers would drain the buffer, and
by the time the kernel got around to posting another sample, could observe
that consumer_pos == producer_pos, and either wouldn't wake up anyone on
the waitq, or wouldn't return any events from ringbuf_map_poll(). If our
intention is to make user-space ringbuffers MPMC, it becomes more difficult
to use these nice heuristics.

> I don't know if we can do anything clever here for waiting for some
> space to be available...  Any thoughts?

Hmmm, yeah, nothing clever is coming to mind. The problem is that we can't
make assumptions about why user-space would be epoll-waiting on the
ringbuffer because because it's a producer, and the user-space producer is
free to post variably sized samples.

For example, I was initially considering whether we could do a heuristic
where we notify the producer only if the buffer was previously full /
producer_pos was ringbuf size away from consumer_pos when we drained a
sample, but that doesn't work at all because there could be space in the
ringbuffer, but user-space is epoll-waiting for *more* space to become
available for some large sample that it wants to publish.

I think the only options we have are:

1. Send a notification (i.e. schedule bpf_ringbuf_notify() using
   irq_work_queue(), and then return EPOLLOUT | EPOLLWRNORM if
   the ringbuffer is not full), every time a sample is drained.

2. Keep the behavior in v1, wherein we have a contract that the kernel will
   always eventually drain the ringbuffer, and will kick user-space when the
   buffer is empty. I think a requirement here would also be that the
   ringbuffer would be SPMC, or we decide that it's acceptable for some
   producers to sleep indefinitely if other producers keep reading samples,
   and that the caller just needs to be aware of this as a possibility.

My two cents are that we go with (1), and then consider our options later
if we need to optimize.

> As for making libbpf loop until enough space is available... I guess
> that would be the only reasonable implementation, right? I wonder if
> calling it "user_ring_buffer__reserve_blocking()" would be a better
> name than just "poll", though?

I went with user_ring_buffer__poll() to match the complementary function
for the user-space consumer function for kernel-producer ringbuffers:
ring_buffer__poll(). I personally prefer
user_ring_buffer__reserve_blocking() because the fact that it's doing an
epoll-wait is entirely an implementation detail. I'll go ahead and make
that change in v3.

Thanks,
David
David Vernet Aug. 18, 2022, 3:05 a.m. UTC | #5
On Wed, Aug 17, 2022 at 09:02:14AM -0500, David Vernet wrote:

[...]

> > > > > +/* Poll for available space in the ringbuffer, and reserve a record when it
> > > > > + * becomes available.
> > > > > + */
> > > > > +void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
> > > > > +                            int timeout_ms)
> > > > > +{
> > > > > +       int cnt;
> > > > > +
> > > > > +       cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
> > > > > +       if (cnt < 0)
> > > > > +               return NULL;
> > > > > +
> > > > > +       return ring_buffer_user__reserve(rb, size);
> > > >
> > > > it's not clear how just doing epoll_wait() guarantees that we have >=
> > > > size of space available?.. Seems like some tests are missing?
> > >
> > > Right now, the kernel only kicks the polling writer once it's drained all
> > > of the samples from the ring buffer. So at this point, if there's not
> > > enough size in the buffer, there would be nothing we could do regardless.
> > > This seemed like reasonable, simple behavior for the initial
> > > implementation. I can make it a bit more intelligent if you'd like, and
> > > return EPOLLRWNORM as soon as there is any space in the buffer, and have
> > > libbpf potentially make multiple calls to epoll_wait() until enough space
> > > has become available.
> > 
> > So this "drain all samples" notion is not great: you can end drain
> > prematurely and thus not really drain all the data in ringbuf.With
> > multiple producers there could also be always more data coming in in
> > parallel. Plus, when in the future we'll have BPF program associated
> > with such ringbuf on the kernel side, we won't have a notion of
> > draining queue, we'll be just submitting record and letting kernel
> > handle it eventually.
> 
> I don't disagree with any of your points. I think what we'll have to
> decide-on is a trade-off between performance and usability. As you pointed
> out, if we only kick user-space once the ringbuffer is empty, that imposes
> the requirement on the kernel that it will always drain the ringbuffer.
> That might not even be possible though if we have multiple producers
> posting data in parallel.
> 
> More on this below, but the TL;DR is that I agree with you, and I think
> having a model where we kick user-space whenever a sample is consumed from
> the buffer is a lot easier to reason about, and probably our only option if
> our plan is to make the ringbuffer MPMC. I'll make this change in v3.
> 
> > So I think yeah, you'd have to send notification when at least one
> > sample gets consumed. The problem is that it's going to be a
> > performance hit, potentially, if you are going to do this notification
> > for each consumed sample. BPF ringbuf gets somewhat around that by
> > using heuristic to avoid notification if we see that consumer is still
> > behind kernel when kernel submits a new sample.
> 
> Something perhaps worth pointing out here is that this heuristic works
> because the kernel-producer ringbuffer is MPSC. If it were MPMC, we'd
> potentially have the same problem you pointed out above where you'd never
> wake up an epoll-waiter because other consumers would drain the buffer, and
> by the time the kernel got around to posting another sample, could observe
> that consumer_pos == producer_pos, and either wouldn't wake up anyone on
> the waitq, or wouldn't return any events from ringbuf_map_poll(). If our
> intention is to make user-space ringbuffers MPMC, it becomes more difficult
> to use these nice heuristics.
> 
> > I don't know if we can do anything clever here for waiting for some
> > space to be available...  Any thoughts?
> 
> Hmmm, yeah, nothing clever is coming to mind. The problem is that we can't
> make assumptions about why user-space would be epoll-waiting on the
> ringbuffer because because it's a producer, and the user-space producer is
> free to post variably sized samples.
> 
> For example, I was initially considering whether we could do a heuristic
> where we notify the producer only if the buffer was previously full /
> producer_pos was ringbuf size away from consumer_pos when we drained a
> sample, but that doesn't work at all because there could be space in the
> ringbuffer, but user-space is epoll-waiting for *more* space to become
> available for some large sample that it wants to publish.
> 
> I think the only options we have are:
> 
> 1. Send a notification (i.e. schedule bpf_ringbuf_notify() using
>    irq_work_queue(), and then return EPOLLOUT | EPOLLWRNORM if
>    the ringbuffer is not full), every time a sample is drained.
> 
> 2. Keep the behavior in v1, wherein we have a contract that the kernel will
>    always eventually drain the ringbuffer, and will kick user-space when the
>    buffer is empty. I think a requirement here would also be that the
>    ringbuffer would be SPMC, or we decide that it's acceptable for some
>    producers to sleep indefinitely if other producers keep reading samples,
>    and that the caller just needs to be aware of this as a possibility.
> 
> My two cents are that we go with (1), and then consider our options later
> if we need to optimize.

Unfortunately this causes the system to hang because of how many times
we're invoking irq_work_queue() in rapid succession.

So, I'm not sure that notifying the user-space producer after _every_
sample is going to work. Another option that occurred to me, however, is to
augment the initial approach taken in v1. Rather than only sending a
notification when the ringbuffer is empty, we could instead guarantee that
we'll always send a notification when bpf_ringbuf_drain() is called and at
least one message was consumed. This is slightly stronger than our earlier
guarantee because we may send a notification even if the ringbuffer is only
partially drained, and it avoids some of the associated pitfalls that you
pointed out (e.g. that a BPF program is now responsible for always draining
the ringbuffer in order to ensure that an epoll-waiting user-space producer
will always be woken up).

The down-side of this is that it still may not be aggressive enough in
waking up user-space producers who could have had data to post as soon as
space was available in the ringbuffer. It won't potentially cause them to
block indefinitely as the first approach did, but they'll still have to
wait for the end of bpf_ringbuf_drain() to be woken up, when they could
potentially have been woken up sooner to start publishing samples. It's
hard to say whether this will be prohibitive to the feature being usable,
but in my opinion it's more useful to have this functionality (and make its
behavior very clearly described in comments and the public documentation
for the APIs) in an imperfect form than to not have it at all.

Additionally, a plus-side to this approach is that it gives us some
flexibility to send more event notifications if we want to. For example, we
could add a heuristic where we always send a notification if the buffer was
previously full before a sample was consumed (in contrast to always
invoking irq_work_queue() _whenever_ a sample is consumed), and then always
send a notification at the end of bpf_ringbuf_drain() to catch any
producers who were waiting for a larger sample than what was afforded by
the initial heuristic notification. I think this should address the likely
common use-case of samples being statically sized (so posting an event as
soon as the ringbuffer is no longer empty should almost always be
sufficient).

We discussed this offline, so I'll go ahead and implement it for the next
version. One thing we didn't discuss was the example heuristic described
above, but I'm going to include that because it seems like the
complementary behavior to what we do for kernel-producer ringbuffers. If
folks aren't a fan, we can remove it in favor of only sending a single
notification in bpf_ringbuf_drain().

Thanks,
David
diff mbox series

Patch

diff --git a/kernel/bpf/ringbuf.c b/kernel/bpf/ringbuf.c
index fc589fc8eb7c..a10558e79ec8 100644
--- a/kernel/bpf/ringbuf.c
+++ b/kernel/bpf/ringbuf.c
@@ -639,7 +639,9 @@  static int __bpf_user_ringbuf_poll(struct bpf_ringbuf *rb, void **sample,
 	if (!atomic_try_cmpxchg(&rb->busy, &busy, 1))
 		return -EBUSY;
 
-	/* Synchronizes with smp_store_release() in user-space. */
+	/* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
+	 * in user-space.
+	 */
 	prod_pos = smp_load_acquire(&rb->producer_pos);
 	/* Synchronizes with smp_store_release() in
 	 * __bpf_user_ringbuf_sample_release().
@@ -695,6 +697,9 @@  __bpf_user_ringbuf_sample_release(struct bpf_ringbuf *rb, size_t size,
 	/* To release the ringbuffer, just increment the producer position to
 	 * signal that a new sample can be consumed. The busy bit is cleared by
 	 * userspace when posting a new sample to the ringbuffer.
+	 *
+	 * Synchronizes with smp_load_acquire() in ring_buffer_user__reserve()
+	 * in user-space.
 	 */
 	smp_store_release(&rb->consumer_pos, rb->consumer_pos + size +
 			  BPF_RINGBUF_HDR_SZ);
diff --git a/tools/lib/bpf/libbpf.c b/tools/lib/bpf/libbpf.c
index 9c1f2d09f44e..f7fe09dce643 100644
--- a/tools/lib/bpf/libbpf.c
+++ b/tools/lib/bpf/libbpf.c
@@ -2367,6 +2367,12 @@  static size_t adjust_ringbuf_sz(size_t sz)
 	return sz;
 }
 
+static bool map_is_ringbuf(const struct bpf_map *map)
+{
+	return map->def.type == BPF_MAP_TYPE_RINGBUF ||
+	       map->def.type == BPF_MAP_TYPE_USER_RINGBUF;
+}
+
 static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def)
 {
 	map->def.type = def->map_type;
@@ -2381,7 +2387,7 @@  static void fill_map_from_def(struct bpf_map *map, const struct btf_map_def *def
 	map->btf_value_type_id = def->value_type_id;
 
 	/* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
-	if (map->def.type == BPF_MAP_TYPE_RINGBUF)
+	if (map_is_ringbuf(map))
 		map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
 
 	if (def->parts & MAP_DEF_MAP_TYPE)
@@ -4363,7 +4369,7 @@  int bpf_map__set_max_entries(struct bpf_map *map, __u32 max_entries)
 	map->def.max_entries = max_entries;
 
 	/* auto-adjust BPF ringbuf map max_entries to be a multiple of page size */
-	if (map->def.type == BPF_MAP_TYPE_RINGBUF)
+	if (map_is_ringbuf(map))
 		map->def.max_entries = adjust_ringbuf_sz(map->def.max_entries);
 
 	return 0;
diff --git a/tools/lib/bpf/libbpf.h b/tools/lib/bpf/libbpf.h
index 61493c4cddac..6d1d0539b08d 100644
--- a/tools/lib/bpf/libbpf.h
+++ b/tools/lib/bpf/libbpf.h
@@ -1009,6 +1009,7 @@  LIBBPF_API int bpf_tc_query(const struct bpf_tc_hook *hook,
 
 /* Ring buffer APIs */
 struct ring_buffer;
+struct ring_buffer_user;
 
 typedef int (*ring_buffer_sample_fn)(void *ctx, void *data, size_t size);
 
@@ -1028,6 +1029,24 @@  LIBBPF_API int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms);
 LIBBPF_API int ring_buffer__consume(struct ring_buffer *rb);
 LIBBPF_API int ring_buffer__epoll_fd(const struct ring_buffer *rb);
 
+struct ring_buffer_user_opts {
+	size_t sz; /* size of this struct, for forward/backward compatibility */
+};
+
+#define ring_buffer_user_opts__last_field sz
+
+LIBBPF_API struct ring_buffer_user *
+ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts);
+LIBBPF_API void *ring_buffer_user__reserve(struct ring_buffer_user *rb,
+					   uint32_t size);
+LIBBPF_API void *ring_buffer_user__poll(struct ring_buffer_user *rb,
+					uint32_t size, int timeout_ms);
+LIBBPF_API void ring_buffer_user__submit(struct ring_buffer_user *rb,
+					 void *sample);
+LIBBPF_API void ring_buffer_user__discard(struct ring_buffer_user *rb,
+					  void *sample);
+LIBBPF_API void ring_buffer_user__free(struct ring_buffer_user *rb);
+
 /* Perf buffer APIs */
 struct perf_buffer;
 
diff --git a/tools/lib/bpf/libbpf.map b/tools/lib/bpf/libbpf.map
index 119e6e1ea7f1..8db11040df1b 100644
--- a/tools/lib/bpf/libbpf.map
+++ b/tools/lib/bpf/libbpf.map
@@ -365,4 +365,10 @@  LIBBPF_1.0.0 {
 		libbpf_bpf_map_type_str;
 		libbpf_bpf_prog_type_str;
 		perf_buffer__buffer;
+		ring_buffer_user__discard;
+		ring_buffer_user__free;
+		ring_buffer_user__new;
+		ring_buffer_user__poll;
+		ring_buffer_user__reserve;
+		ring_buffer_user__submit;
 };
diff --git a/tools/lib/bpf/libbpf_probes.c b/tools/lib/bpf/libbpf_probes.c
index 6d495656f554..f3a8e8e74eb8 100644
--- a/tools/lib/bpf/libbpf_probes.c
+++ b/tools/lib/bpf/libbpf_probes.c
@@ -231,6 +231,7 @@  static int probe_map_create(enum bpf_map_type map_type)
 			return btf_fd;
 		break;
 	case BPF_MAP_TYPE_RINGBUF:
+	case BPF_MAP_TYPE_USER_RINGBUF:
 		key_size = 0;
 		value_size = 0;
 		max_entries = 4096;
diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
index 8bc117bcc7bc..86e3c11d8486 100644
--- a/tools/lib/bpf/ringbuf.c
+++ b/tools/lib/bpf/ringbuf.c
@@ -39,6 +39,17 @@  struct ring_buffer {
 	int ring_cnt;
 };
 
+struct ring_buffer_user {
+	struct epoll_event event;
+	unsigned long *consumer_pos;
+	unsigned long *producer_pos;
+	void *data;
+	unsigned long mask;
+	size_t page_size;
+	int map_fd;
+	int epoll_fd;
+};
+
 static void ringbuf_unmap_ring(struct ring_buffer *rb, struct ring *r)
 {
 	if (r->consumer_pos) {
@@ -300,3 +311,208 @@  int ring_buffer__epoll_fd(const struct ring_buffer *rb)
 {
 	return rb->epoll_fd;
 }
+
+static void __user_ringbuf_unmap_ring(struct ring_buffer_user *rb)
+{
+	if (rb->consumer_pos) {
+		munmap(rb->consumer_pos, rb->page_size);
+		rb->consumer_pos = NULL;
+	}
+	if (rb->producer_pos) {
+		munmap(rb->producer_pos, rb->page_size + 2 * (rb->mask + 1));
+		rb->producer_pos = NULL;
+	}
+}
+
+void ring_buffer_user__free(struct ring_buffer_user *rb)
+{
+	if (!rb)
+		return;
+
+	__user_ringbuf_unmap_ring(rb);
+
+	if (rb->epoll_fd >= 0)
+		close(rb->epoll_fd);
+
+	free(rb);
+}
+
+static int __ring_buffer_user_map(struct ring_buffer_user *rb, int map_fd)
+{
+
+	struct bpf_map_info info;
+	__u32 len = sizeof(info);
+	void *tmp;
+	struct epoll_event *rb_epoll;
+	int err;
+
+	memset(&info, 0, sizeof(info));
+
+	err = bpf_obj_get_info_by_fd(map_fd, &info, &len);
+	if (err) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to get map info for fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+
+	if (info.type != BPF_MAP_TYPE_USER_RINGBUF) {
+		pr_warn("user ringbuf: map fd=%d is not BPF_MAP_TYPE_USER_RINGBUF\n",
+			map_fd);
+		return libbpf_err(-EINVAL);
+	}
+
+	rb->map_fd = map_fd;
+	rb->mask = info.max_entries - 1;
+
+	/* Map read-only consumer page */
+	tmp = mmap(NULL, rb->page_size, PROT_READ, MAP_SHARED, map_fd, 0);
+	if (tmp == MAP_FAILED) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to mmap consumer page for map fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+	rb->consumer_pos = tmp;
+
+	/* Map read-write the producer page and data pages. We map the data
+	 * region as twice the total size of the ringbuffer to allow the simple
+	 * reading and writing of samples that wrap around the end of the
+	 * buffer.  See the kernel implementation for details.
+	 */
+	tmp = mmap(NULL, rb->page_size + 2 * info.max_entries,
+		   PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, rb->page_size);
+	if (tmp == MAP_FAILED) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to mmap data pages for map fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+
+	rb->producer_pos = tmp;
+	rb->data = tmp + rb->page_size;
+
+	rb_epoll = &rb->event;
+	rb_epoll->events = EPOLLOUT;
+	if (epoll_ctl(rb->epoll_fd, EPOLL_CTL_ADD, map_fd, rb_epoll) < 0) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to epoll add map fd=%d: %d\n",
+			map_fd, err);
+		return libbpf_err(err);
+	}
+
+	return 0;
+}
+
+struct ring_buffer_user *
+ring_buffer_user__new(int map_fd, const struct ring_buffer_user_opts *opts)
+{
+	struct ring_buffer_user *rb;
+	int err;
+
+	if (!OPTS_VALID(opts, ring_buffer_opts))
+		return errno = EINVAL, NULL;
+
+	rb = calloc(1, sizeof(*rb));
+	if (!rb)
+		return errno = ENOMEM, NULL;
+
+	rb->page_size = getpagesize();
+
+	rb->epoll_fd = epoll_create1(EPOLL_CLOEXEC);
+	if (rb->epoll_fd < 0) {
+		err = -errno;
+		pr_warn("user ringbuf: failed to create epoll instance: %d\n",
+			err);
+		goto err_out;
+	}
+
+	err = __ring_buffer_user_map(rb, map_fd);
+	if (err)
+		goto err_out;
+
+	return rb;
+
+err_out:
+	ring_buffer_user__free(rb);
+	return errno = -err, NULL;
+}
+
+static void __ring_buffer_user__commit(struct ring_buffer_user *rb)
+{
+	uint32_t *hdr;
+	uint32_t total_len;
+	unsigned long prod_pos;
+
+	prod_pos = *rb->producer_pos;
+	hdr = rb->data + (prod_pos & rb->mask);
+
+	total_len = *hdr + BPF_RINGBUF_HDR_SZ;
+
+	/* Synchronizes with smp_load_acquire() in __bpf_user_ringbuf_poll() in
+	 * the kernel.
+	 */
+	smp_store_release(rb->producer_pos, prod_pos + total_len);
+}
+
+/* Discard a previously reserved sample into the ring buffer. Because the user
+ * ringbuffer is assumed to be single producer, this can simply be a no-op, and
+ * the producer pointer is left in the same place as when it was reserved.
+ */
+void ring_buffer_user__discard(struct ring_buffer_user *rb, void *sample)
+{}
+
+/* Submit a previously reserved sample into the ring buffer.
+ */
+void ring_buffer_user__submit(struct ring_buffer_user *rb, void *sample)
+{
+	__ring_buffer_user__commit(rb);
+}
+
+/* Reserve a pointer to a sample in the user ring buffer. This function is *not*
+ * thread safe, and the ring-buffer supports only a single producer.
+ */
+void *ring_buffer_user__reserve(struct ring_buffer_user *rb, uint32_t size)
+{
+	uint32_t *hdr;
+	/* 64-bit to avoid overflow in case of extreme application behavior */
+	size_t avail_size, total_size, max_size;
+	unsigned long cons_pos, prod_pos;
+
+	/* Synchronizes with smp_store_release() in __bpf_user_ringbuf_poll() in
+	 * the kernel.
+	 */
+	cons_pos = smp_load_acquire(rb->consumer_pos);
+	/* Synchronizes with smp_store_release() in __ring_buffer_user__commit()
+	 */
+	prod_pos = smp_load_acquire(rb->producer_pos);
+
+	max_size = rb->mask + 1;
+	avail_size = max_size - (prod_pos - cons_pos);
+	total_size = size + BPF_RINGBUF_HDR_SZ;
+
+	if (total_size > max_size || avail_size < total_size)
+		return NULL;
+
+	hdr = rb->data + (prod_pos & rb->mask);
+	*hdr = size;
+
+	/* Producer pos is updated when a sample is submitted. */
+
+	return (void *)rb->data + ((prod_pos + BPF_RINGBUF_HDR_SZ) & rb->mask);
+}
+
+/* Poll for available space in the ringbuffer, and reserve a record when it
+ * becomes available.
+ */
+void *ring_buffer_user__poll(struct ring_buffer_user *rb, uint32_t size,
+			     int timeout_ms)
+{
+	int cnt;
+
+	cnt = epoll_wait(rb->epoll_fd, &rb->event, 1, timeout_ms);
+	if (cnt < 0)
+		return NULL;
+
+	return ring_buffer_user__reserve(rb, size);
+}