Message ID | 20240111161712.1480333-3-vdonnefort@google.com (mailing list archive) |
---|---|
State | Superseded |
Headers | show |
Series | Introducing trace buffer mapping by user-space | expand |
On 2024-01-11 11:17, Vincent Donnefort wrote: > In preparation for allowing the user-space to map a ring-buffer, add > a set of mapping functions: > > ring_buffer_{map,unmap}() > ring_buffer_map_fault() > > And controls on the ring-buffer: > > ring_buffer_map_get_reader() /* swap reader and head */ > > Mapping the ring-buffer also involves: > > A unique ID for each subbuf of the ring-buffer, currently they are > only identified through their in-kernel VA. > > A meta-page, where are stored ring-buffer statistics and a > description for the current reader > Hi Vincent, The LTTng kernel tracer has supported mmap'd buffers for nearly 15 years [1], and has a lot of similarities with this patch series. LTTng has the notion of "subbuffer id" to allow atomically exchanging a "reader" extra subbuffer with the subbuffer to be read. It implements "get subbuffer" / "put subbuffer" ioctls to allow the consumer (reader) to move the currently read subbuffer position. [2] It would not hurt to compare your approach to LTTng and highlight similarities/differences, and the rationale for the differences. Especially when it comes to designing kernel ABIs, it's good to make sure that all bases are covered, because those choices will have lasting impacts. Thanks, Mathieu [1] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_mmap.c [2] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_vfs.c
On Thu, 11 Jan 2024 11:34:58 -0500 Mathieu Desnoyers <mathieu.desnoyers@efficios.com> wrote: > The LTTng kernel tracer has supported mmap'd buffers for nearly 15 years [1], > and has a lot of similarities with this patch series. > > LTTng has the notion of "subbuffer id" to allow atomically exchanging a > "reader" extra subbuffer with the subbuffer to be read. It implements > "get subbuffer" / "put subbuffer" ioctls to allow the consumer (reader) > to move the currently read subbuffer position. [2] > > It would not hurt to compare your approach to LTTng and highlight > similarities/differences, and the rationale for the differences. > > Especially when it comes to designing kernel ABIs, it's good to make sure > that all bases are covered, because those choices will have lasting impacts. > > Thanks, > > Mathieu > > [1] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_mmap.c > [2] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_vfs.c > Hi Mathieu, Thanks for sharing! As we discussed a little bit in the tracing meeting we do somethings differently but very similar too ;-) The similarities as that all the sub-buffers are mapped. You have a reader-sub-buffer as well. The main difference is that you use an ioctl() that returns where to find the reader-sub-buffer, where our ioctl() is just "I'm done, get me a new reader-sub-buffer". Ours will update the meta page. Our meta page looks like this: > +struct trace_buffer_meta { > + unsigned long entries; > + unsigned long overrun; > + unsigned long read; If start tracing: trace-cmd start -e sched_switch and do: ~ cat /sys/kernel/tracing/per_cpu/cpu0/stats entries: 14 overrun: 0 commit overrun: 0 bytes: 992 oldest event ts: 84844.825372 now ts: 84847.102075 dropped events: 0 read events: 0 You'll see similar to the above. entries = entries overrun = overrun read = read The above "read" is total number of events read. Pretty staight forward ;-) > + > + unsigned long subbufs_touched; > + unsigned long subbufs_lost; > + unsigned long subbufs_read; Now I'm thinking we may not want this exported, as I'm not sure it's useful. Vincent, talking with Mathieu, he was suggesting that we only export what we really need, and I don't think we need the above. Especially since they are not exposed in the stats file. > + > + struct { > + unsigned long lost_events; > + __u32 id; > + __u32 read; > + } reader; The above is definitely needed, as all of it is used to read the reader-page of the sub-buffer. lost_events is the number of lost events that happened before this sub-buffer was swapped out. Hmm, Vincent, I just notice that you update the lost_events as: > +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) > +{ > + struct trace_buffer_meta *meta = cpu_buffer->meta_page; > + > + WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries)); > + WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun)); > + WRITE_ONCE(meta->read, cpu_buffer->read); > + > + WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched)); > + WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost)); > + WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read)); > + > + WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read); > + WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id); > + WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events); > +} The lost_events may need to be handled differently, as it doesn't always get cleared. So it may be stale data. > + > + __u32 subbuf_size; > + __u32 nr_subbufs; This gets is the information needed to read the mapped ring buffer. > + > + __u32 meta_page_size; > + __u32 meta_struct_len; The ring buffer gets mapped after "meta_page_size" and this structure is "meta_struct_len" which will change if we add new data to it. > +}; -- Steve
On Thu, Jan 11, 2024 at 06:23:20PM -0500, Steven Rostedt wrote: > On Thu, 11 Jan 2024 11:34:58 -0500 > Mathieu Desnoyers <mathieu.desnoyers@efficios.com> wrote: > > > > The LTTng kernel tracer has supported mmap'd buffers for nearly 15 years [1], > > and has a lot of similarities with this patch series. > > > > LTTng has the notion of "subbuffer id" to allow atomically exchanging a > > "reader" extra subbuffer with the subbuffer to be read. It implements > > "get subbuffer" / "put subbuffer" ioctls to allow the consumer (reader) > > to move the currently read subbuffer position. [2] > > > > It would not hurt to compare your approach to LTTng and highlight > > similarities/differences, and the rationale for the differences. > > > > Especially when it comes to designing kernel ABIs, it's good to make sure > > that all bases are covered, because those choices will have lasting impacts. > > > > Thanks, > > > > Mathieu > > > > [1] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_mmap.c > > [2] https://git.lttng.org/?p=lttng-modules.git;a=blob;f=src/lib/ringbuffer/ring_buffer_vfs.c > > > > Hi Mathieu, > > Thanks for sharing! > > As we discussed a little bit in the tracing meeting we do somethings > differently but very similar too ;-) > > The similarities as that all the sub-buffers are mapped. You have a > reader-sub-buffer as well. > > The main difference is that you use an ioctl() that returns where to find > the reader-sub-buffer, where our ioctl() is just "I'm done, get me a new > reader-sub-buffer". Ours will update the meta page. > > Our meta page looks like this: > > > +struct trace_buffer_meta { > > + unsigned long entries; > > + unsigned long overrun; > > + unsigned long read; > > If start tracing: trace-cmd start -e sched_switch and do: > > ~ cat /sys/kernel/tracing/per_cpu/cpu0/stats > entries: 14 > overrun: 0 > commit overrun: 0 > bytes: 992 > oldest event ts: 84844.825372 > now ts: 84847.102075 > dropped events: 0 > read events: 0 > > You'll see similar to the above. > > entries = entries > overrun = overrun > read = read > > The above "read" is total number of events read. > > Pretty staight forward ;-) > > > > + > > + unsigned long subbufs_touched; > > + unsigned long subbufs_lost; > > + unsigned long subbufs_read; > > Now I'm thinking we may not want this exported, as I'm not sure it's useful. touched and lost are not useful now, but it'll be for my support of the hypervisor tracing, that's why I added them already. subbufs_read could probably go away though as even in that case I can track that in the reader. > > Vincent, talking with Mathieu, he was suggesting that we only export what > we really need, and I don't think we need the above. Especially since they > are not exposed in the stats file. > > > > + > > + struct { > > + unsigned long lost_events; > > + __u32 id; > > + __u32 read; > > + } reader; > > The above is definitely needed, as all of it is used to read the > reader-page of the sub-buffer. > > lost_events is the number of lost events that happened before this > sub-buffer was swapped out. > > Hmm, Vincent, I just notice that you update the lost_events as: > > > +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) > > +{ > > + struct trace_buffer_meta *meta = cpu_buffer->meta_page; > > + > > + WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries)); > > + WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun)); > > + WRITE_ONCE(meta->read, cpu_buffer->read); > > + > > + WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched)); > > + WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost)); > > + WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read)); > > + > > + WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read); > > + WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id); > > + WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events); > > +} > > The lost_events may need to be handled differently, as it doesn't always > get cleared. So it may be stale data. My idea was to check this value after the ioctl(). If > 0 then events were lost between the that ioctl() and the previous swap. But now if with "[PATCH] ring-buffer: Have mmapped ring buffer keep track of missed events" we put this information in the page itself, we can get rid of this field. > > > > + > > + __u32 subbuf_size; > > + __u32 nr_subbufs; > > This gets is the information needed to read the mapped ring buffer. > > > + > > + __u32 meta_page_size; > > + __u32 meta_struct_len; > > The ring buffer gets mapped after "meta_page_size" and this structure is > "meta_struct_len" which will change if we add new data to it. > > > +}; > > -- Steve
On Fri, 12 Jan 2024 09:13:02 +0000 Vincent Donnefort <vdonnefort@google.com> wrote: > > > + > > > + unsigned long subbufs_touched; > > > + unsigned long subbufs_lost; > > > + unsigned long subbufs_read; > > > > Now I'm thinking we may not want this exported, as I'm not sure it's useful. > > touched and lost are not useful now, but it'll be for my support of the > hypervisor tracing, that's why I added them already. > > subbufs_read could probably go away though as even in that case I can track that > in the reader. Yeah, but I think we can leave it off for now. This is something we could extend the structure with. In fact, it can be different for different buffers! That is, since they are pretty meaningless for the normal ring buffer, I don't think we need to export it. But if it's useful for your hypervisor buffer, it can export it. It would be a good test to know if the extension works. Of course, that means if the normal ring buffer needs more info, it must also include this as well, as it will already be part of the extended structure. > > > > > Vincent, talking with Mathieu, he was suggesting that we only export what > > we really need, and I don't think we need the above. Especially since they > > are not exposed in the stats file. > > > > > > > + > > > + struct { > > > + unsigned long lost_events; > > > + __u32 id; > > > + __u32 read; > > > + } reader; > > > > The above is definitely needed, as all of it is used to read the > > reader-page of the sub-buffer. > > > > lost_events is the number of lost events that happened before this > > sub-buffer was swapped out. > > > > Hmm, Vincent, I just notice that you update the lost_events as: > > > > > +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) > > > +{ > > > + struct trace_buffer_meta *meta = cpu_buffer->meta_page; > > > + > > > + WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries)); > > > + WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun)); > > > + WRITE_ONCE(meta->read, cpu_buffer->read); > > > + > > > + WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched)); > > > + WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost)); > > > + WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read)); > > > + > > > + WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read); > > > + WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id); > > > + WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events); > > > +} > > > > The lost_events may need to be handled differently, as it doesn't always > > get cleared. So it may be stale data. > > My idea was to check this value after the ioctl(). If > 0 then events were lost > between the that ioctl() and the previous swap. > > But now if with "[PATCH] ring-buffer: Have mmapped ring buffer keep track of > missed events" we put this information in the page itself, we can get rid of > this field. > I'm thinking both may be good, as the number of dropped events are not added if there's no room to put it at the end of the ring buffer. When there's no room, it just sets a flag that there was missed events but doesn't give how many events were missed. If anything, it should be in both locations. In the sub-buffer header, to be consistent with the read/splice version, and in the meta page were if there's no room to add the count, it can be accessed in the meta page. -- Steve
On Fri, 12 Jan 2024 10:06:41 -0500 Steven Rostedt <rostedt@goodmis.org> wrote: > I'm thinking both may be good, as the number of dropped events are not > added if there's no room to put it at the end of the ring buffer. When > there's no room, it just sets a flag that there was missed events but > doesn't give how many events were missed. > > If anything, it should be in both locations. In the sub-buffer header, to > be consistent with the read/splice version, and in the meta page were if > there's no room to add the count, it can be accessed in the meta page. I think the meta data page should look like this: struct trace_buffer_meta { __u32 meta_page_size; __u32 meta_struct_len; __u32 subbuf_size; __u32 nr_subbufs; struct { __u64 lost_events; __u32 id; __u32 read; } reader; __u64 entries; __u64 overrun; __u64 read; }; 1) meta_page_size The size of this meta page. It's the first thing the application needs to know how much to mmap. 2) meta_struct_len The actual length of the structure. It's the second thing the application will look at to know what version it is dealing with. 3) subbuf_size 4) nr_subbufs The next two is the next thing the application should do. That is to memory map in all the sub-buffers. With 1) and this, it knows how to do that. The first four are needed for setup, and never changes once mapped. The rest gets updated during the trace. 5) reader 5a) lost_events 5b) id 5c) read This is actively needed during tracing. It is what is used to know where and how to read the reader sub-buffer. 6) entries 7) overrun 8) read These are useful statistics, but probably seldom really looked at. They should be at the end. Also notice that I converted all "unsigned long" to __u64. This is because it is possible to have a 32 bit userspace running this on top of a 64 bit kernel. If we were to use "long" it would be inconsistent and buggy. Now if you need subbufs_touched and subbufs_lost. When that gets opened, it would update the meta_struct_len accordingly, and the structure would look like: struct trace_buffer_meta { __u32 meta_page_size; __u32 meta_struct_len; __u32 subbuf_size; __u32 nr_subbufs; struct { __u64 lost_events; __u32 id; __u32 read; } reader; __u64 entries; __u64 overrun; __u64 read; __u64 subbufs_touched; __u64 subbufs_lost; }; Make sense? -- Steve
On Thu, 11 Jan 2024 16:17:09 +0000 Vincent Donnefort <vdonnefort@google.com> wrote: > In preparation for allowing the user-space to map a ring-buffer, add > a set of mapping functions: > > ring_buffer_{map,unmap}() > ring_buffer_map_fault() > > And controls on the ring-buffer: > > ring_buffer_map_get_reader() /* swap reader and head */ > > Mapping the ring-buffer also involves: > > A unique ID for each subbuf of the ring-buffer, currently they are > only identified through their in-kernel VA. > > A meta-page, where are stored ring-buffer statistics and a > description for the current reader > > The linear mapping exposes the meta-page, and each subbuf of the > ring-buffer, ordered following their unique ID, assigned during the > first mapping. > > Once mapped, no subbuf can get in or out of the ring-buffer: the buffer > size will remain unmodified and the splice enabling functions will in > reality simply memcpy the data instead of swapping subbufs. > > Signed-off-by: Vincent Donnefort <vdonnefort@google.com> > > diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h > index fa802db216f9..0841ba8bab14 100644 > --- a/include/linux/ring_buffer.h > +++ b/include/linux/ring_buffer.h > @@ -6,6 +6,8 @@ > #include <linux/seq_file.h> > #include <linux/poll.h> > > +#include <uapi/linux/trace_mmap.h> > + > struct trace_buffer; > struct ring_buffer_iter; > > @@ -221,4 +223,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node); > #define trace_rb_cpu_prepare NULL > #endif > > +int ring_buffer_map(struct trace_buffer *buffer, int cpu); > +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu); > +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu, > + unsigned long pgoff); > +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu); > #endif /* _LINUX_RING_BUFFER_H */ > diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h > new file mode 100644 > index 000000000000..bde39a73ce65 > --- /dev/null > +++ b/include/uapi/linux/trace_mmap.h > @@ -0,0 +1,45 @@ > +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */ > +#ifndef _TRACE_MMAP_H_ > +#define _TRACE_MMAP_H_ > + > +#include <linux/types.h> > + > +/** > + * struct trace_buffer_meta - Ring-buffer Meta-page description > + * @entries: Number of entries in the ring-buffer. > + * @overrun: Number of entries lost in the ring-buffer. > + * @read: Number of entries that have been read. > + * @subbufs_touched: Number of subbufs that have been filled. > + * @subbufs_lost: Number of subbufs lost to overrun. > + * @subbufs_read: Number of subbufs that have been read. > + * @reader.lost_events: Number of events lost at the time of the reader swap. > + * @reader.id: subbuf ID of the current reader. From 0 to @nr_subbufs - 1 > + * @reader.read: Number of bytes read on the reader subbuf. > + * @subbuf_size: Size of each subbuf, including the header. > + * @nr_subbufs: Number of subbfs in the ring-buffer. > + * @meta_page_size: Size of this meta-page. > + * @meta_struct_len: Size of this structure. > + */ > +struct trace_buffer_meta { > + unsigned long entries; > + unsigned long overrun; > + unsigned long read; > + > + unsigned long subbufs_touched; > + unsigned long subbufs_lost; > + unsigned long subbufs_read; > + > + struct { > + unsigned long lost_events; > + __u32 id; > + __u32 read; > + } reader; > + > + __u32 subbuf_size; > + __u32 nr_subbufs; > + > + __u32 meta_page_size; > + __u32 meta_struct_len; > +}; > + > +#endif /* _TRACE_MMAP_H_ */ > diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c > index db73e326fa04..e9ff1c95e896 100644 > --- a/kernel/trace/ring_buffer.c > +++ b/kernel/trace/ring_buffer.c > @@ -338,6 +338,7 @@ struct buffer_page { > local_t entries; /* entries on this page */ > unsigned long real_end; /* real end of data */ > unsigned order; /* order of the page */ > + u32 id; /* ID for external mapping */ > struct buffer_data_page *page; /* Actual data page */ > }; > > @@ -484,6 +485,12 @@ struct ring_buffer_per_cpu { > u64 read_stamp; > /* pages removed since last reset */ > unsigned long pages_removed; > + > + int mapped; > + struct mutex mapping_lock; > + unsigned long *subbuf_ids; /* ID to addr */ > + struct trace_buffer_meta *meta_page; > + > /* ring buffer pages to update, > 0 to add, < 0 to remove */ > long nr_pages_to_update; > struct list_head new_pages; /* new pages to add */ > @@ -1542,6 +1549,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) > init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); > init_waitqueue_head(&cpu_buffer->irq_work.waiters); > init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); > + mutex_init(&cpu_buffer->mapping_lock); > > bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), > GFP_KERNEL, cpu_to_node(cpu)); > @@ -5160,6 +5168,23 @@ static void rb_clear_buffer_page(struct buffer_page *page) > page->read = 0; > } > > +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) > +{ > + struct trace_buffer_meta *meta = cpu_buffer->meta_page; > + > + WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries)); > + WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun)); > + WRITE_ONCE(meta->read, cpu_buffer->read); > + > + WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched)); > + WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost)); > + WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read)); > + > + WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read); > + WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id); > + WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events); > +} > + > static void > rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) > { > @@ -5204,6 +5229,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) > cpu_buffer->lost_events = 0; > cpu_buffer->last_overrun = 0; > > + if (cpu_buffer->mapped) > + rb_update_meta_page(cpu_buffer); > + > rb_head_page_activate(cpu_buffer); > cpu_buffer->pages_removed = 0; > } > @@ -5418,6 +5446,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, > cpu_buffer_a = buffer_a->buffers[cpu]; > cpu_buffer_b = buffer_b->buffers[cpu]; > > + if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) { > + ret = -EBUSY; > + goto out; > + } Ah, this is not enough to stop snapshot. update_max_tr()@kernel/trace/trace.c is used for swapping all CPU buffer and it does not use this function. (but that should use this instead of accessing buffer directly...) Maybe we need ring_buffer_swap() and it checks all cpu_buffer does not set mapping bits. Thank you, > + > /* At least make sure the two buffers are somewhat the same */ > if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) > goto out; > @@ -5682,7 +5715,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer, > * Otherwise, we can simply swap the page with the one passed in. > */ > if (read || (len < (commit - read)) || > - cpu_buffer->reader_page == cpu_buffer->commit_page) { > + cpu_buffer->reader_page == cpu_buffer->commit_page || > + cpu_buffer->mapped) { > struct buffer_data_page *rpage = cpu_buffer->reader_page->page; > unsigned int rpos = read; > unsigned int pos = 0; > @@ -5901,6 +5935,11 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) > > cpu_buffer = buffer->buffers[cpu]; > > + if (cpu_buffer->mapped) { > + err = -EBUSY; > + goto error; > + } > + > /* Update the number of pages to match the new size */ > nr_pages = old_size * buffer->buffers[cpu]->nr_pages; > nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); > @@ -6002,6 +6041,295 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) > } > EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); > > +#define subbuf_page(off, start) \ > + virt_to_page((void *)(start + (off << PAGE_SHIFT))) > + > +#define foreach_subbuf_page(sub_order, start, page) \ > + page = subbuf_page(0, (start)); \ > + for (int __off = 0; __off < (1 << (sub_order)); \ > + __off++, page = subbuf_page(__off, (start))) > + > +static inline void subbuf_map_prepare(unsigned long subbuf_start, int order) > +{ > + struct page *page; > + > + /* > + * When allocating order > 0 pages, only the first struct page has a > + * refcount > 1. Increasing the refcount here ensures none of the struct > + * page composing the sub-buffer is freeed when the mapping is closed. > + */ > + foreach_subbuf_page(order, subbuf_start, page) > + page_ref_inc(page); > +} > + > +static inline void subbuf_unmap(unsigned long subbuf_start, int order) > +{ > + struct page *page; > + > + foreach_subbuf_page(order, subbuf_start, page) { > + page_ref_dec(page); > + page->mapping = NULL; > + } > +} > + > +static void rb_free_subbuf_ids(struct ring_buffer_per_cpu *cpu_buffer) > +{ > + int sub_id; > + > + for (sub_id = 0; sub_id < cpu_buffer->nr_pages + 1; sub_id++) > + subbuf_unmap(cpu_buffer->subbuf_ids[sub_id], > + cpu_buffer->buffer->subbuf_order); > + > + kfree(cpu_buffer->subbuf_ids); > + cpu_buffer->subbuf_ids = NULL; > +} > + > +static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) > +{ > + if (cpu_buffer->meta_page) > + return 0; > + > + cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER | __GFP_ZERO)); > + if (!cpu_buffer->meta_page) > + return -ENOMEM; > + > + return 0; > +} > + > +static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) > +{ > + unsigned long addr = (unsigned long)cpu_buffer->meta_page; > + > + virt_to_page((void *)addr)->mapping = NULL; > + free_page(addr); > + cpu_buffer->meta_page = NULL; > +} > + > +static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, > + unsigned long *subbuf_ids) > +{ > + struct trace_buffer_meta *meta = cpu_buffer->meta_page; > + unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; > + struct buffer_page *first_subbuf, *subbuf; > + int id = 0; > + > + subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page; > + subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order); > + cpu_buffer->reader_page->id = id++; > + > + first_subbuf = subbuf = rb_set_head_page(cpu_buffer); > + do { > + if (id >= nr_subbufs) { > + WARN_ON(1); > + break; > + } > + > + subbuf_ids[id] = (unsigned long)subbuf->page; > + subbuf->id = id; > + subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order); > + > + rb_inc_page(&subbuf); > + id++; > + } while (subbuf != first_subbuf); > + > + /* install subbuf ID to kern VA translation */ > + cpu_buffer->subbuf_ids = subbuf_ids; > + > + meta->meta_page_size = PAGE_SIZE; > + meta->meta_struct_len = sizeof(*meta); > + meta->nr_subbufs = nr_subbufs; > + meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; > + > + rb_update_meta_page(cpu_buffer); > +} > + > +static inline struct ring_buffer_per_cpu * > +rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) > +{ > + struct ring_buffer_per_cpu *cpu_buffer; > + > + if (!cpumask_test_cpu(cpu, buffer->cpumask)) > + return ERR_PTR(-EINVAL); > + > + cpu_buffer = buffer->buffers[cpu]; > + > + mutex_lock(&cpu_buffer->mapping_lock); > + > + if (!cpu_buffer->mapped) { > + mutex_unlock(&cpu_buffer->mapping_lock); > + return ERR_PTR(-ENODEV); > + } > + > + return cpu_buffer; > +} > + > +static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) > +{ > + mutex_unlock(&cpu_buffer->mapping_lock); > +} > + > +int ring_buffer_map(struct trace_buffer *buffer, int cpu) > +{ > + struct ring_buffer_per_cpu *cpu_buffer; > + unsigned long flags, *subbuf_ids; > + int err = 0; > + > + if (!cpumask_test_cpu(cpu, buffer->cpumask)) > + return -EINVAL; > + > + cpu_buffer = buffer->buffers[cpu]; > + > + mutex_lock(&cpu_buffer->mapping_lock); > + > + if (cpu_buffer->mapped) { > + if (cpu_buffer->mapped == INT_MAX) > + err = -EBUSY; > + else > + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1); > + mutex_unlock(&cpu_buffer->mapping_lock); > + return err; > + } > + > + /* prevent another thread from changing buffer sizes */ > + mutex_lock(&buffer->mutex); > + > + err = rb_alloc_meta_page(cpu_buffer); > + if (err) > + goto unlock; > + > + /* subbuf_ids include the reader while nr_pages does not */ > + subbuf_ids = kzalloc(sizeof(*subbuf_ids) * (cpu_buffer->nr_pages + 1), > + GFP_KERNEL); > + if (!subbuf_ids) { > + rb_free_meta_page(cpu_buffer); > + err = -ENOMEM; > + goto unlock; > + } > + > + atomic_inc(&cpu_buffer->resize_disabled); > + > + /* > + * Lock all readers to block any subbuf swap until the subbuf IDs are > + * assigned. > + */ > + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); > + > + rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); > + > + WRITE_ONCE(cpu_buffer->mapped, 1); > + > + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); > +unlock: > + mutex_unlock(&buffer->mutex); > + mutex_unlock(&cpu_buffer->mapping_lock); > + > + return err; > +} > + > +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) > +{ > + struct ring_buffer_per_cpu *cpu_buffer; > + int err = 0; > + > + if (!cpumask_test_cpu(cpu, buffer->cpumask)) > + return -EINVAL; > + > + cpu_buffer = buffer->buffers[cpu]; > + > + mutex_lock(&cpu_buffer->mapping_lock); > + > + if (!cpu_buffer->mapped) { > + err = -ENODEV; > + goto unlock; > + } > + > + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1); > + if (!cpu_buffer->mapped) { > + /* Wait for the writer and readers to observe !mapped */ > + synchronize_rcu(); > + > + rb_free_subbuf_ids(cpu_buffer); > + rb_free_meta_page(cpu_buffer); > + atomic_dec(&cpu_buffer->resize_disabled); > + } > +unlock: > + mutex_unlock(&cpu_buffer->mapping_lock); > + > + return err; > +} > + > +/* > + * +--------------+ pgoff == 0 > + * | meta page | > + * +--------------+ pgoff == 1 > + * | subbuffer 0 | > + * +--------------+ pgoff == 1 + (1 << subbuf_order) > + * | subbuffer 1 | > + * ... > + */ > +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu, > + unsigned long pgoff) > +{ > + struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; > + unsigned long subbuf_id, subbuf_offset, addr; > + struct page *page; > + > + if (!pgoff) > + return virt_to_page((void *)cpu_buffer->meta_page); > + > + pgoff--; > + > + subbuf_id = pgoff >> buffer->subbuf_order; > + if (subbuf_id > cpu_buffer->nr_pages) > + return NULL; > + > + subbuf_offset = pgoff & ((1UL << buffer->subbuf_order) - 1); > + addr = cpu_buffer->subbuf_ids[subbuf_id] + (subbuf_offset * PAGE_SIZE); > + page = virt_to_page((void *)addr); > + > + return page; > +} > + > +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) > +{ > + struct ring_buffer_per_cpu *cpu_buffer; > + unsigned long reader_size; > + unsigned long flags; > + > + cpu_buffer = rb_get_mapped_buffer(buffer, cpu); > + if (IS_ERR(cpu_buffer)) > + return (int)PTR_ERR(cpu_buffer); > + > + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); > +consume: > + if (rb_per_cpu_empty(cpu_buffer)) > + goto out; > + > + reader_size = rb_page_size(cpu_buffer->reader_page); > + > + /* > + * There are data to be read on the current reader page, we can > + * return to the caller. But before that, we assume the latter will read > + * everything. Let's update the kernel reader accordingly. > + */ > + if (cpu_buffer->reader_page->read < reader_size) { > + while (cpu_buffer->reader_page->read < reader_size) > + rb_advance_reader(cpu_buffer); > + goto out; > + } > + > + if (WARN_ON(!rb_get_reader_page(cpu_buffer))) > + goto out; > + > + goto consume; > +out: > + rb_update_meta_page(cpu_buffer); > + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); > + rb_put_mapped_buffer(cpu_buffer); > + > + return 0; > +} > + > /* > * We only allocate new buffers, never free them if the CPU goes down. > * If we were to free the buffer, then the user would lose any trace that was in > -- > 2.43.0.275.g3460e3d667-goog >
On Mon, Jan 15, 2024 at 01:43:03PM +0900, Masami Hiramatsu wrote: > On Thu, 11 Jan 2024 16:17:09 +0000 > Vincent Donnefort <vdonnefort@google.com> wrote: > > > In preparation for allowing the user-space to map a ring-buffer, add > > a set of mapping functions: > > > > ring_buffer_{map,unmap}() > > ring_buffer_map_fault() > > > > And controls on the ring-buffer: > > > > ring_buffer_map_get_reader() /* swap reader and head */ > > > > Mapping the ring-buffer also involves: > > > > A unique ID for each subbuf of the ring-buffer, currently they are > > only identified through their in-kernel VA. > > > > A meta-page, where are stored ring-buffer statistics and a > > description for the current reader > > > > The linear mapping exposes the meta-page, and each subbuf of the > > ring-buffer, ordered following their unique ID, assigned during the > > first mapping. > > > > Once mapped, no subbuf can get in or out of the ring-buffer: the buffer > > size will remain unmodified and the splice enabling functions will in > > reality simply memcpy the data instead of swapping subbufs. > > > > Signed-off-by: Vincent Donnefort <vdonnefort@google.com> > > > > diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h > > index fa802db216f9..0841ba8bab14 100644 > > --- a/include/linux/ring_buffer.h > > +++ b/include/linux/ring_buffer.h > > @@ -6,6 +6,8 @@ > > #include <linux/seq_file.h> > > #include <linux/poll.h> > > > > +#include <uapi/linux/trace_mmap.h> > > + > > struct trace_buffer; > > struct ring_buffer_iter; > > > > @@ -221,4 +223,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node); > > #define trace_rb_cpu_prepare NULL > > #endif > > > > +int ring_buffer_map(struct trace_buffer *buffer, int cpu); > > +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu); > > +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu, > > + unsigned long pgoff); > > +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu); > > #endif /* _LINUX_RING_BUFFER_H */ > > diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h > > new file mode 100644 > > index 000000000000..bde39a73ce65 > > --- /dev/null > > +++ b/include/uapi/linux/trace_mmap.h > > @@ -0,0 +1,45 @@ > > +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */ > > +#ifndef _TRACE_MMAP_H_ > > +#define _TRACE_MMAP_H_ > > + > > +#include <linux/types.h> > > + > > +/** > > + * struct trace_buffer_meta - Ring-buffer Meta-page description > > + * @entries: Number of entries in the ring-buffer. > > + * @overrun: Number of entries lost in the ring-buffer. > > + * @read: Number of entries that have been read. > > + * @subbufs_touched: Number of subbufs that have been filled. > > + * @subbufs_lost: Number of subbufs lost to overrun. > > + * @subbufs_read: Number of subbufs that have been read. > > + * @reader.lost_events: Number of events lost at the time of the reader swap. > > + * @reader.id: subbuf ID of the current reader. From 0 to @nr_subbufs - 1 > > + * @reader.read: Number of bytes read on the reader subbuf. > > + * @subbuf_size: Size of each subbuf, including the header. > > + * @nr_subbufs: Number of subbfs in the ring-buffer. > > + * @meta_page_size: Size of this meta-page. > > + * @meta_struct_len: Size of this structure. > > + */ > > +struct trace_buffer_meta { > > + unsigned long entries; > > + unsigned long overrun; > > + unsigned long read; > > + > > + unsigned long subbufs_touched; > > + unsigned long subbufs_lost; > > + unsigned long subbufs_read; > > + > > + struct { > > + unsigned long lost_events; > > + __u32 id; > > + __u32 read; > > + } reader; > > + > > + __u32 subbuf_size; > > + __u32 nr_subbufs; > > + > > + __u32 meta_page_size; > > + __u32 meta_struct_len; > > +}; > > + > > +#endif /* _TRACE_MMAP_H_ */ > > diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c > > index db73e326fa04..e9ff1c95e896 100644 > > --- a/kernel/trace/ring_buffer.c > > +++ b/kernel/trace/ring_buffer.c > > @@ -338,6 +338,7 @@ struct buffer_page { > > local_t entries; /* entries on this page */ > > unsigned long real_end; /* real end of data */ > > unsigned order; /* order of the page */ > > + u32 id; /* ID for external mapping */ > > struct buffer_data_page *page; /* Actual data page */ > > }; > > > > @@ -484,6 +485,12 @@ struct ring_buffer_per_cpu { > > u64 read_stamp; > > /* pages removed since last reset */ > > unsigned long pages_removed; > > + > > + int mapped; > > + struct mutex mapping_lock; > > + unsigned long *subbuf_ids; /* ID to addr */ > > + struct trace_buffer_meta *meta_page; > > + > > /* ring buffer pages to update, > 0 to add, < 0 to remove */ > > long nr_pages_to_update; > > struct list_head new_pages; /* new pages to add */ > > @@ -1542,6 +1549,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) > > init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); > > init_waitqueue_head(&cpu_buffer->irq_work.waiters); > > init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); > > + mutex_init(&cpu_buffer->mapping_lock); > > > > bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), > > GFP_KERNEL, cpu_to_node(cpu)); > > @@ -5160,6 +5168,23 @@ static void rb_clear_buffer_page(struct buffer_page *page) > > page->read = 0; > > } > > > > +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) > > +{ > > + struct trace_buffer_meta *meta = cpu_buffer->meta_page; > > + > > + WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries)); > > + WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun)); > > + WRITE_ONCE(meta->read, cpu_buffer->read); > > + > > + WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched)); > > + WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost)); > > + WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read)); > > + > > + WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read); > > + WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id); > > + WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events); > > +} > > + > > static void > > rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) > > { > > @@ -5204,6 +5229,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) > > cpu_buffer->lost_events = 0; > > cpu_buffer->last_overrun = 0; > > > > + if (cpu_buffer->mapped) > > + rb_update_meta_page(cpu_buffer); > > + > > rb_head_page_activate(cpu_buffer); > > cpu_buffer->pages_removed = 0; > > } > > @@ -5418,6 +5446,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, > > cpu_buffer_a = buffer_a->buffers[cpu]; > > cpu_buffer_b = buffer_b->buffers[cpu]; > > > > + if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) { > > + ret = -EBUSY; > > + goto out; > > + } > > Ah, this is not enough to stop snapshot. update_max_tr()@kernel/trace/trace.c > is used for swapping all CPU buffer and it does not use this function. > (but that should use this instead of accessing buffer directly...) > > Maybe we need ring_buffer_swap() and it checks all cpu_buffer does not set > mapping bits. > > Thank you, How about instead of having ring_buffer_swap_cpu() returning -EBUSY when mapped we have two functions to block any mapping that trace.c could call? int rb_swap_prepare(struct ring_buffer *cpu_buffer) { mutex_lock(&cpu_buffer->mapping_lock); if (!cpu_buffer->mapped) return 0; mutex_unlock(&cpu_buffer->mapping_lock); } int rb_swap_done(struct ring_buffer *cpu_buffer) { mutex_unlock(&cpu_buffer->mapping_lock); } int ring_buffer_swap_prepare(struct trace_buffer *buffer, int cpu) { ... if (cpu == RING_BUFFER_ALL_CPUS) { int cpu_i; for_each_buffer_cpu(buffer, cpu_i) { err = rb_swap_prepare(buffer->buffers[cpu_i]); if (err) break; } if (err) /* Rollback ... */ return err; } return rb_swap_prepare(buffer->buffers[cpu]); } void ring_buffer_swap_done(struct trace_buffer *buffer, int cpu) { if (cpu == RING_BUFFER_ALL_CPUS) { int cpu_i; for_each_buffer_cpu(buffer, cpu_i) rb_swap_done(buffer->buffers[cpu_i]); return; } return rb_swap_done(buffer->buffers[cpu]); } [...]
On Mon, 15 Jan 2024 15:37:31 +0000 Vincent Donnefort <vdonnefort@google.com> wrote: > > > @@ -5418,6 +5446,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, > > > cpu_buffer_a = buffer_a->buffers[cpu]; > > > cpu_buffer_b = buffer_b->buffers[cpu]; > > > > > > + if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) { > > > + ret = -EBUSY; > > > + goto out; > > > + } > > > > Ah, this is not enough to stop snapshot. update_max_tr()@kernel/trace/trace.c > > is used for swapping all CPU buffer and it does not use this function. > > (but that should use this instead of accessing buffer directly...) > > > > Maybe we need ring_buffer_swap() and it checks all cpu_buffer does not set > > mapping bits. > > > > Thank you, > > How about instead of having ring_buffer_swap_cpu() returning -EBUSY when mapped > we have two functions to block any mapping that trace.c could call? > No. The ring buffer logic should not care if the user of it is swapping the entire ring buffer or not. It only cares if parts of the ring buffer is being swapped or not. That's not the level of scope it should care about. If we do not want a swap to happen in update_max_tr() that's not ring_buffer.c's problem. The code to prevent that from happening should be 100% in trace.c. The trace.c code knows what's being mapped or not. The accounting to keep a mapped buffer from being swapped should stay in trace.c, and not add unnecessary implementation coupling between that and ring_buffer.c. -- Steve
On Mon, 15 Jan 2024 11:09:38 -0500 Steven Rostedt <rostedt@goodmis.org> wrote: > No. The ring buffer logic should not care if the user of it is swapping > the entire ring buffer or not. It only cares if parts of the ring > buffer is being swapped or not. That's not the level of scope it should > care about. If we do not want a swap to happen in update_max_tr() > that's not ring_buffer.c's problem. The code to prevent that from > happening should be 100% in trace.c. What needs to be done, and feel free to add this as a separate patch, is to have checks where snapshot is used. (All errors return -EBUSY) Before allowing mapping, check to see if: 1) the current tracer has "use_max_tr" set. 2) any event has a "snapshot" trigger set 3) Any function has a "snapshot" command set Fail if any of the above is true. Also in reverse, if the buffer is mapped, then fail: 1) a tracer being set that has "use_max_tr" set. 2) a "snapshot" command being set on a function 3) a "snapshot" trigger being set on an event. For the last two, we may be able to get away with just a below as well. Adding the tr->flags bit. We could also add a tr->snapshot count to keep track of everything that is using a snapshot, and if that count is non-zero, mapping fails. diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c index 2a7c6fd934e9..f534f74ae80f 100644 --- a/kernel/trace/trace.c +++ b/kernel/trace/trace.c @@ -1175,6 +1175,12 @@ static void tracing_snapshot_instance_cond(struct trace_array *tr, return; } + if (tr->flags & TRACE_ARRAY_FL_MAPPED) { + trace_array_puts(tr, "*** BUFFER IS MEMORY MAPPED ***\n"); + trace_array_puts(tr, "*** Can not use snapshot (sorry) ***\n"); + return; + } + local_irq_save(flags); update_max_tr(tr, current, smp_processor_id(), cond_data); local_irq_restore(flags); -- Steve
On Mon, Jan 15, 2024 at 11:23:59AM -0500, Steven Rostedt wrote: > On Mon, 15 Jan 2024 11:09:38 -0500 > Steven Rostedt <rostedt@goodmis.org> wrote: > > > No. The ring buffer logic should not care if the user of it is swapping > > the entire ring buffer or not. It only cares if parts of the ring > > buffer is being swapped or not. That's not the level of scope it should > > care about. If we do not want a swap to happen in update_max_tr() > > that's not ring_buffer.c's problem. The code to prevent that from > > happening should be 100% in trace.c. > > What needs to be done, and feel free to add this as a separate patch, > is to have checks where snapshot is used. > > (All errors return -EBUSY) > > Before allowing mapping, check to see if: > > 1) the current tracer has "use_max_tr" set. > 2) any event has a "snapshot" trigger set > 3) Any function has a "snapshot" command set Could we sum-up this with a single check to allocate_snapshot? If that is allocated it's probably because we'll be using it? That would simply add the requirement to echo 0 > snapshot before starting the memory map? The opposite could be to let tracing_alloc_snapshot_instance() fail whenever a mapping is in place? > > Fail if any of the above is true. > > Also in reverse, if the buffer is mapped, then fail: > > 1) a tracer being set that has "use_max_tr" set. > 2) a "snapshot" command being set on a function > 3) a "snapshot" trigger being set on an event. > > For the last two, we may be able to get away with just a below as well. > Adding the tr->flags bit. We could also add a tr->snapshot count to > keep track of everything that is using a snapshot, and if that count is > non-zero, mapping fails. > > diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c > index 2a7c6fd934e9..f534f74ae80f 100644 > --- a/kernel/trace/trace.c > +++ b/kernel/trace/trace.c > @@ -1175,6 +1175,12 @@ static void tracing_snapshot_instance_cond(struct trace_array *tr, > return; > } > > + if (tr->flags & TRACE_ARRAY_FL_MAPPED) { > + trace_array_puts(tr, "*** BUFFER IS MEMORY MAPPED ***\n"); > + trace_array_puts(tr, "*** Can not use snapshot (sorry) ***\n"); > + return; > + } > + > local_irq_save(flags); > update_max_tr(tr, current, smp_processor_id(), cond_data); > local_irq_restore(flags); > > > -- Steve > > -- > To unsubscribe from this group and stop receiving emails from it, send an email to kernel-team+unsubscribe@android.com. >
On Mon, 15 Jan 2024 17:29:09 +0000 Vincent Donnefort <vdonnefort@google.com> wrote: > > > > What needs to be done, and feel free to add this as a separate patch, > > is to have checks where snapshot is used. > > > > (All errors return -EBUSY) > > > > Before allowing mapping, check to see if: > > > > 1) the current tracer has "use_max_tr" set. > > 2) any event has a "snapshot" trigger set > > 3) Any function has a "snapshot" command set > > Could we sum-up this with a single check to allocate_snapshot? If that is > allocated it's probably because we'll be using it? Not always. It can be allocated at any time and never used. I'd like to keep the allocation of the snapshot buffer agnostic to if the buffer is mapped or not, especially since it can be allocated at boot up and never used. Several of my boxes have "alloc_snapshot" on the command line even though I don't always use it. But we could update the output of reading the "snapshot" file to: ~# cat /sys/kernel/tracing/snapshot # tracer: nop # # # ** Snapshot disabled due to the buffer being memory mapped ** # # * Snapshot is freed * # # Snapshot commands: # echo 0 > snapshot : Clears and frees snapshot buffer # echo 1 > snapshot : Allocates snapshot buffer, if not already allocated. # Takes a snapshot of the main buffer. # echo 2 > snapshot : Clears snapshot buffer (but does not allocate or free) # (Doesn't have to be '2' works with any number that # is not a '0' or '1') If it is allocated: ~# cat /sys/kernel/tracing/snapshot # tracer: nop # # ** Snapshot disabled due to the buffer being memory mapped ** # # * Snapshot is allocated * # # Snapshot commands: # echo 0 > snapshot : Clears and frees snapshot buffer # echo 1 > snapshot : Allocates snapshot buffer, if not already allocated. # Takes a snapshot of the main buffer. # echo 2 > snapshot : Clears snapshot buffer (but does not allocate or free) # (Doesn't have to be '2' works with any number that # is not a '0' or '1') > > That would simply add the requirement to echo 0 > snapshot before starting the > memory map? I rather not depend on that. It just makes it more complex for why mapping failed. If you get -EBUSY, it will be hard to know why. > > The opposite could be to let tracing_alloc_snapshot_instance() fail whenever a > mapping is in place? Yes, that would fail if mapping is in place. Because the snapshot being allocated can be something people want, as it allows the snapshot to happen immediately when needed, I don't want the fact that it is allocated to prevent mapping. As mapping may just happen for a small period of time while an application is running. -- Steve
On Mon, 15 Jan 2024 11:23:59 -0500 Steven Rostedt <rostedt@goodmis.org> wrote: > On Mon, 15 Jan 2024 11:09:38 -0500 > Steven Rostedt <rostedt@goodmis.org> wrote: > > > No. The ring buffer logic should not care if the user of it is swapping > > the entire ring buffer or not. It only cares if parts of the ring > > buffer is being swapped or not. That's not the level of scope it should > > care about. If we do not want a swap to happen in update_max_tr() > > that's not ring_buffer.c's problem. The code to prevent that from > > happening should be 100% in trace.c. > > What needs to be done, and feel free to add this as a separate patch, > is to have checks where snapshot is used. > > (All errors return -EBUSY) > > Before allowing mapping, check to see if: > > 1) the current tracer has "use_max_tr" set. > 2) any event has a "snapshot" trigger set > 3) Any function has a "snapshot" command set > > Fail if any of the above is true. > > Also in reverse, if the buffer is mapped, then fail: > > 1) a tracer being set that has "use_max_tr" set. > 2) a "snapshot" command being set on a function > 3) a "snapshot" trigger being set on an event. > > For the last two, we may be able to get away with just a below as well. > Adding the tr->flags bit. We could also add a tr->snapshot count to > keep track of everything that is using a snapshot, and if that count is > non-zero, mapping fails. BTW, if we allow mapping per-cpu ring buffer, we may need "tr->mapped" counter in addition to per-cpu mapped bit. Then we can just check tr->snapshot at mapping, and tr->mapped at preparing snapshot. Thank you, > > diff --git a/kernel/trace/trace.c b/kernel/trace/trace.c > index 2a7c6fd934e9..f534f74ae80f 100644 > --- a/kernel/trace/trace.c > +++ b/kernel/trace/trace.c > @@ -1175,6 +1175,12 @@ static void tracing_snapshot_instance_cond(struct trace_array *tr, > return; > } > > + if (tr->flags & TRACE_ARRAY_FL_MAPPED) { > + trace_array_puts(tr, "*** BUFFER IS MEMORY MAPPED ***\n"); > + trace_array_puts(tr, "*** Can not use snapshot (sorry) ***\n"); > + return; > + } > + > local_irq_save(flags); > update_max_tr(tr, current, smp_processor_id(), cond_data); > local_irq_restore(flags); > > > -- Steve
diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h index fa802db216f9..0841ba8bab14 100644 --- a/include/linux/ring_buffer.h +++ b/include/linux/ring_buffer.h @@ -6,6 +6,8 @@ #include <linux/seq_file.h> #include <linux/poll.h> +#include <uapi/linux/trace_mmap.h> + struct trace_buffer; struct ring_buffer_iter; @@ -221,4 +223,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node); #define trace_rb_cpu_prepare NULL #endif +int ring_buffer_map(struct trace_buffer *buffer, int cpu); +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu); +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu, + unsigned long pgoff); +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu); #endif /* _LINUX_RING_BUFFER_H */ diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h new file mode 100644 index 000000000000..bde39a73ce65 --- /dev/null +++ b/include/uapi/linux/trace_mmap.h @@ -0,0 +1,45 @@ +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */ +#ifndef _TRACE_MMAP_H_ +#define _TRACE_MMAP_H_ + +#include <linux/types.h> + +/** + * struct trace_buffer_meta - Ring-buffer Meta-page description + * @entries: Number of entries in the ring-buffer. + * @overrun: Number of entries lost in the ring-buffer. + * @read: Number of entries that have been read. + * @subbufs_touched: Number of subbufs that have been filled. + * @subbufs_lost: Number of subbufs lost to overrun. + * @subbufs_read: Number of subbufs that have been read. + * @reader.lost_events: Number of events lost at the time of the reader swap. + * @reader.id: subbuf ID of the current reader. From 0 to @nr_subbufs - 1 + * @reader.read: Number of bytes read on the reader subbuf. + * @subbuf_size: Size of each subbuf, including the header. + * @nr_subbufs: Number of subbfs in the ring-buffer. + * @meta_page_size: Size of this meta-page. + * @meta_struct_len: Size of this structure. + */ +struct trace_buffer_meta { + unsigned long entries; + unsigned long overrun; + unsigned long read; + + unsigned long subbufs_touched; + unsigned long subbufs_lost; + unsigned long subbufs_read; + + struct { + unsigned long lost_events; + __u32 id; + __u32 read; + } reader; + + __u32 subbuf_size; + __u32 nr_subbufs; + + __u32 meta_page_size; + __u32 meta_struct_len; +}; + +#endif /* _TRACE_MMAP_H_ */ diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c index db73e326fa04..e9ff1c95e896 100644 --- a/kernel/trace/ring_buffer.c +++ b/kernel/trace/ring_buffer.c @@ -338,6 +338,7 @@ struct buffer_page { local_t entries; /* entries on this page */ unsigned long real_end; /* real end of data */ unsigned order; /* order of the page */ + u32 id; /* ID for external mapping */ struct buffer_data_page *page; /* Actual data page */ }; @@ -484,6 +485,12 @@ struct ring_buffer_per_cpu { u64 read_stamp; /* pages removed since last reset */ unsigned long pages_removed; + + int mapped; + struct mutex mapping_lock; + unsigned long *subbuf_ids; /* ID to addr */ + struct trace_buffer_meta *meta_page; + /* ring buffer pages to update, > 0 to add, < 0 to remove */ long nr_pages_to_update; struct list_head new_pages; /* new pages to add */ @@ -1542,6 +1549,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu) init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters); init_waitqueue_head(&cpu_buffer->irq_work.waiters); init_waitqueue_head(&cpu_buffer->irq_work.full_waiters); + mutex_init(&cpu_buffer->mapping_lock); bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()), GFP_KERNEL, cpu_to_node(cpu)); @@ -5160,6 +5168,23 @@ static void rb_clear_buffer_page(struct buffer_page *page) page->read = 0; } +static void rb_update_meta_page(struct ring_buffer_per_cpu *cpu_buffer) +{ + struct trace_buffer_meta *meta = cpu_buffer->meta_page; + + WRITE_ONCE(meta->entries, local_read(&cpu_buffer->entries)); + WRITE_ONCE(meta->overrun, local_read(&cpu_buffer->overrun)); + WRITE_ONCE(meta->read, cpu_buffer->read); + + WRITE_ONCE(meta->subbufs_touched, local_read(&cpu_buffer->pages_touched)); + WRITE_ONCE(meta->subbufs_lost, local_read(&cpu_buffer->pages_lost)); + WRITE_ONCE(meta->subbufs_read, local_read(&cpu_buffer->pages_read)); + + WRITE_ONCE(meta->reader.read, cpu_buffer->reader_page->read); + WRITE_ONCE(meta->reader.id, cpu_buffer->reader_page->id); + WRITE_ONCE(meta->reader.lost_events, cpu_buffer->lost_events); +} + static void rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) { @@ -5204,6 +5229,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer) cpu_buffer->lost_events = 0; cpu_buffer->last_overrun = 0; + if (cpu_buffer->mapped) + rb_update_meta_page(cpu_buffer); + rb_head_page_activate(cpu_buffer); cpu_buffer->pages_removed = 0; } @@ -5418,6 +5446,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a, cpu_buffer_a = buffer_a->buffers[cpu]; cpu_buffer_b = buffer_b->buffers[cpu]; + if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) { + ret = -EBUSY; + goto out; + } + /* At least make sure the two buffers are somewhat the same */ if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages) goto out; @@ -5682,7 +5715,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer, * Otherwise, we can simply swap the page with the one passed in. */ if (read || (len < (commit - read)) || - cpu_buffer->reader_page == cpu_buffer->commit_page) { + cpu_buffer->reader_page == cpu_buffer->commit_page || + cpu_buffer->mapped) { struct buffer_data_page *rpage = cpu_buffer->reader_page->page; unsigned int rpos = read; unsigned int pos = 0; @@ -5901,6 +5935,11 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) cpu_buffer = buffer->buffers[cpu]; + if (cpu_buffer->mapped) { + err = -EBUSY; + goto error; + } + /* Update the number of pages to match the new size */ nr_pages = old_size * buffer->buffers[cpu]->nr_pages; nr_pages = DIV_ROUND_UP(nr_pages, buffer->subbuf_size); @@ -6002,6 +6041,295 @@ int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order) } EXPORT_SYMBOL_GPL(ring_buffer_subbuf_order_set); +#define subbuf_page(off, start) \ + virt_to_page((void *)(start + (off << PAGE_SHIFT))) + +#define foreach_subbuf_page(sub_order, start, page) \ + page = subbuf_page(0, (start)); \ + for (int __off = 0; __off < (1 << (sub_order)); \ + __off++, page = subbuf_page(__off, (start))) + +static inline void subbuf_map_prepare(unsigned long subbuf_start, int order) +{ + struct page *page; + + /* + * When allocating order > 0 pages, only the first struct page has a + * refcount > 1. Increasing the refcount here ensures none of the struct + * page composing the sub-buffer is freeed when the mapping is closed. + */ + foreach_subbuf_page(order, subbuf_start, page) + page_ref_inc(page); +} + +static inline void subbuf_unmap(unsigned long subbuf_start, int order) +{ + struct page *page; + + foreach_subbuf_page(order, subbuf_start, page) { + page_ref_dec(page); + page->mapping = NULL; + } +} + +static void rb_free_subbuf_ids(struct ring_buffer_per_cpu *cpu_buffer) +{ + int sub_id; + + for (sub_id = 0; sub_id < cpu_buffer->nr_pages + 1; sub_id++) + subbuf_unmap(cpu_buffer->subbuf_ids[sub_id], + cpu_buffer->buffer->subbuf_order); + + kfree(cpu_buffer->subbuf_ids); + cpu_buffer->subbuf_ids = NULL; +} + +static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer) +{ + if (cpu_buffer->meta_page) + return 0; + + cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER | __GFP_ZERO)); + if (!cpu_buffer->meta_page) + return -ENOMEM; + + return 0; +} + +static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer) +{ + unsigned long addr = (unsigned long)cpu_buffer->meta_page; + + virt_to_page((void *)addr)->mapping = NULL; + free_page(addr); + cpu_buffer->meta_page = NULL; +} + +static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer, + unsigned long *subbuf_ids) +{ + struct trace_buffer_meta *meta = cpu_buffer->meta_page; + unsigned int nr_subbufs = cpu_buffer->nr_pages + 1; + struct buffer_page *first_subbuf, *subbuf; + int id = 0; + + subbuf_ids[id] = (unsigned long)cpu_buffer->reader_page->page; + subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order); + cpu_buffer->reader_page->id = id++; + + first_subbuf = subbuf = rb_set_head_page(cpu_buffer); + do { + if (id >= nr_subbufs) { + WARN_ON(1); + break; + } + + subbuf_ids[id] = (unsigned long)subbuf->page; + subbuf->id = id; + subbuf_map_prepare(subbuf_ids[id], cpu_buffer->buffer->subbuf_order); + + rb_inc_page(&subbuf); + id++; + } while (subbuf != first_subbuf); + + /* install subbuf ID to kern VA translation */ + cpu_buffer->subbuf_ids = subbuf_ids; + + meta->meta_page_size = PAGE_SIZE; + meta->meta_struct_len = sizeof(*meta); + meta->nr_subbufs = nr_subbufs; + meta->subbuf_size = cpu_buffer->buffer->subbuf_size + BUF_PAGE_HDR_SIZE; + + rb_update_meta_page(cpu_buffer); +} + +static inline struct ring_buffer_per_cpu * +rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu) +{ + struct ring_buffer_per_cpu *cpu_buffer; + + if (!cpumask_test_cpu(cpu, buffer->cpumask)) + return ERR_PTR(-EINVAL); + + cpu_buffer = buffer->buffers[cpu]; + + mutex_lock(&cpu_buffer->mapping_lock); + + if (!cpu_buffer->mapped) { + mutex_unlock(&cpu_buffer->mapping_lock); + return ERR_PTR(-ENODEV); + } + + return cpu_buffer; +} + +static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer) +{ + mutex_unlock(&cpu_buffer->mapping_lock); +} + +int ring_buffer_map(struct trace_buffer *buffer, int cpu) +{ + struct ring_buffer_per_cpu *cpu_buffer; + unsigned long flags, *subbuf_ids; + int err = 0; + + if (!cpumask_test_cpu(cpu, buffer->cpumask)) + return -EINVAL; + + cpu_buffer = buffer->buffers[cpu]; + + mutex_lock(&cpu_buffer->mapping_lock); + + if (cpu_buffer->mapped) { + if (cpu_buffer->mapped == INT_MAX) + err = -EBUSY; + else + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1); + mutex_unlock(&cpu_buffer->mapping_lock); + return err; + } + + /* prevent another thread from changing buffer sizes */ + mutex_lock(&buffer->mutex); + + err = rb_alloc_meta_page(cpu_buffer); + if (err) + goto unlock; + + /* subbuf_ids include the reader while nr_pages does not */ + subbuf_ids = kzalloc(sizeof(*subbuf_ids) * (cpu_buffer->nr_pages + 1), + GFP_KERNEL); + if (!subbuf_ids) { + rb_free_meta_page(cpu_buffer); + err = -ENOMEM; + goto unlock; + } + + atomic_inc(&cpu_buffer->resize_disabled); + + /* + * Lock all readers to block any subbuf swap until the subbuf IDs are + * assigned. + */ + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); + + rb_setup_ids_meta_page(cpu_buffer, subbuf_ids); + + WRITE_ONCE(cpu_buffer->mapped, 1); + + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); +unlock: + mutex_unlock(&buffer->mutex); + mutex_unlock(&cpu_buffer->mapping_lock); + + return err; +} + +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu) +{ + struct ring_buffer_per_cpu *cpu_buffer; + int err = 0; + + if (!cpumask_test_cpu(cpu, buffer->cpumask)) + return -EINVAL; + + cpu_buffer = buffer->buffers[cpu]; + + mutex_lock(&cpu_buffer->mapping_lock); + + if (!cpu_buffer->mapped) { + err = -ENODEV; + goto unlock; + } + + WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1); + if (!cpu_buffer->mapped) { + /* Wait for the writer and readers to observe !mapped */ + synchronize_rcu(); + + rb_free_subbuf_ids(cpu_buffer); + rb_free_meta_page(cpu_buffer); + atomic_dec(&cpu_buffer->resize_disabled); + } +unlock: + mutex_unlock(&cpu_buffer->mapping_lock); + + return err; +} + +/* + * +--------------+ pgoff == 0 + * | meta page | + * +--------------+ pgoff == 1 + * | subbuffer 0 | + * +--------------+ pgoff == 1 + (1 << subbuf_order) + * | subbuffer 1 | + * ... + */ +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu, + unsigned long pgoff) +{ + struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu]; + unsigned long subbuf_id, subbuf_offset, addr; + struct page *page; + + if (!pgoff) + return virt_to_page((void *)cpu_buffer->meta_page); + + pgoff--; + + subbuf_id = pgoff >> buffer->subbuf_order; + if (subbuf_id > cpu_buffer->nr_pages) + return NULL; + + subbuf_offset = pgoff & ((1UL << buffer->subbuf_order) - 1); + addr = cpu_buffer->subbuf_ids[subbuf_id] + (subbuf_offset * PAGE_SIZE); + page = virt_to_page((void *)addr); + + return page; +} + +int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu) +{ + struct ring_buffer_per_cpu *cpu_buffer; + unsigned long reader_size; + unsigned long flags; + + cpu_buffer = rb_get_mapped_buffer(buffer, cpu); + if (IS_ERR(cpu_buffer)) + return (int)PTR_ERR(cpu_buffer); + + raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags); +consume: + if (rb_per_cpu_empty(cpu_buffer)) + goto out; + + reader_size = rb_page_size(cpu_buffer->reader_page); + + /* + * There are data to be read on the current reader page, we can + * return to the caller. But before that, we assume the latter will read + * everything. Let's update the kernel reader accordingly. + */ + if (cpu_buffer->reader_page->read < reader_size) { + while (cpu_buffer->reader_page->read < reader_size) + rb_advance_reader(cpu_buffer); + goto out; + } + + if (WARN_ON(!rb_get_reader_page(cpu_buffer))) + goto out; + + goto consume; +out: + rb_update_meta_page(cpu_buffer); + raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags); + rb_put_mapped_buffer(cpu_buffer); + + return 0; +} + /* * We only allocate new buffers, never free them if the CPU goes down. * If we were to free the buffer, then the user would lose any trace that was in
In preparation for allowing the user-space to map a ring-buffer, add a set of mapping functions: ring_buffer_{map,unmap}() ring_buffer_map_fault() And controls on the ring-buffer: ring_buffer_map_get_reader() /* swap reader and head */ Mapping the ring-buffer also involves: A unique ID for each subbuf of the ring-buffer, currently they are only identified through their in-kernel VA. A meta-page, where are stored ring-buffer statistics and a description for the current reader The linear mapping exposes the meta-page, and each subbuf of the ring-buffer, ordered following their unique ID, assigned during the first mapping. Once mapped, no subbuf can get in or out of the ring-buffer: the buffer size will remain unmodified and the splice enabling functions will in reality simply memcpy the data instead of swapping subbufs. Signed-off-by: Vincent Donnefort <vdonnefort@google.com>