diff mbox series

[v5,1/2] ring-buffer: Introducing ring-buffer mapping functions

Message ID 20230728164754.460767-2-vdonnefort@google.com (mailing list archive)
State Superseded
Headers show
Series Introducing trace buffer mapping by user-space | expand

Commit Message

Vincent Donnefort July 28, 2023, 4:47 p.m. UTC
In preparation for allowing the user-space to map a ring-buffer, add
a set of mapping functions:

  ring_buffer_{map,unmap}()
  ring_buffer_map_fault()

And controls on the ring-buffer:

  ring_buffer_map_get_reader_page()  /* swap reader and head */

Mapping the ring-buffer also involves:

  A unique ID for each page of the ring-buffer, as currently the pages
  are only identified through their in-kernel VA.

  A meta-page, where are stored statistics about the ring-buffer and
  a page IDs list, ordered. A field gives what page is the reader
  one and one to gives where the ring-buffer starts in the list of data
  pages.

The linear mapping exposes the meta-page, and each page of the
ring-buffer, ordered following their unique ID, assigned during the
first mapping.

Once mapped, no page can get in or out of the ring-buffer: the buffer
size will remain unmodified and the splice enabling functions will in
reality simply memcpy the data instead of swapping pages.

Signed-off-by: Vincent Donnefort <vdonnefort@google.com>

Comments

kernel test robot July 29, 2023, 1:09 a.m. UTC | #1
Hi Vincent,

kernel test robot noticed the following build errors:

[auto build test ERROR on linus/master]
[cannot apply to rostedt-trace/for-next rostedt-trace/for-next-urgent]
[If your patch is applied to the wrong git tree, kindly drop us a note.
And when submitting patch, we suggest to use '--base' as documented in
https://git-scm.com/docs/git-format-patch#_base_tree_information]

url:    https://github.com/intel-lab-lkp/linux/commits/Vincent-Donnefort/ring-buffer-Introducing-ring-buffer-mapping-functions/20230729-005300
base:   linus/master
patch link:    https://lore.kernel.org/r/20230728164754.460767-2-vdonnefort%40google.com
patch subject: [PATCH v5 1/2] ring-buffer: Introducing ring-buffer mapping functions
config: arm-randconfig-r033-20230728 (https://download.01.org/0day-ci/archive/20230729/202307290828.WNBmhbTA-lkp@intel.com/config)
compiler: clang version 17.0.0 (https://github.com/llvm/llvm-project.git 4a5ac14ee968ff0ad5d2cc1ffa0299048db4c88a)
reproduce: (https://download.01.org/0day-ci/archive/20230729/202307290828.WNBmhbTA-lkp@intel.com/reproduce)

If you fix the issue in a separate patch/commit (i.e. not just a new version of
the same patch/commit), kindly add following tags
| Reported-by: kernel test robot <lkp@intel.com>
| Closes: https://lore.kernel.org/oe-kbuild-all/202307290828.WNBmhbTA-lkp@intel.com/

All errors (new ones prefixed by >>):

>> kernel/trace/ring_buffer.c:5946:16: error: incompatible integer to pointer conversion passing 'unsigned long' to parameter of type 'const void *' [-Wint-conversion]
    5946 |                 virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
         |                              ^~~~~~~~~~~~~~~~~~~~~~~
   arch/arm/include/asm/memory.h:390:53: note: expanded from macro 'virt_to_page'
     390 | #define virt_to_page(kaddr)     pfn_to_page(virt_to_pfn(kaddr))
         |                                                         ^~~~~
   include/asm-generic/memory_model.h:18:41: note: expanded from macro '__pfn_to_page'
      18 | #define __pfn_to_page(pfn)      (mem_map + ((pfn) - ARCH_PFN_OFFSET))
         |                                              ^~~
   arch/arm/include/asm/memory.h:296:53: note: passing argument to parameter 'p' here
     296 | static inline unsigned long virt_to_pfn(const void *p)
         |                                                     ^
   kernel/trace/ring_buffer.c:5968:15: error: incompatible integer to pointer conversion passing 'unsigned long' to parameter of type 'const void *' [-Wint-conversion]
    5968 |         virt_to_page(addr)->mapping = NULL;
         |                      ^~~~
   arch/arm/include/asm/memory.h:390:53: note: expanded from macro 'virt_to_page'
     390 | #define virt_to_page(kaddr)     pfn_to_page(virt_to_pfn(kaddr))
         |                                                         ^~~~~
   include/asm-generic/memory_model.h:18:41: note: expanded from macro '__pfn_to_page'
      18 | #define __pfn_to_page(pfn)      (mem_map + ((pfn) - ARCH_PFN_OFFSET))
         |                                              ^~~
   arch/arm/include/asm/memory.h:296:53: note: passing argument to parameter 'p' here
     296 | static inline unsigned long virt_to_pfn(const void *p)
         |                                                     ^
   kernel/trace/ring_buffer.c:6156:22: error: incompatible integer to pointer conversion passing 'unsigned long' to parameter of type 'const void *' [-Wint-conversion]
    6156 |         return virt_to_page(cpu_buffer->page_ids[pgoff]);
         |                             ^~~~~~~~~~~~~~~~~~~~~~~~~~~
   arch/arm/include/asm/memory.h:390:53: note: expanded from macro 'virt_to_page'
     390 | #define virt_to_page(kaddr)     pfn_to_page(virt_to_pfn(kaddr))
         |                                                         ^~~~~
   include/asm-generic/memory_model.h:18:41: note: expanded from macro '__pfn_to_page'
      18 | #define __pfn_to_page(pfn)      (mem_map + ((pfn) - ARCH_PFN_OFFSET))
         |                                              ^~~
   arch/arm/include/asm/memory.h:296:53: note: passing argument to parameter 'p' here
     296 | static inline unsigned long virt_to_pfn(const void *p)
         |                                                     ^
   3 errors generated.


vim +5946 kernel/trace/ring_buffer.c

  5940	
  5941	static void rb_free_page_ids(struct ring_buffer_per_cpu *cpu_buffer)
  5942	{
  5943		int i;
  5944	
  5945		for (i = 0; i < cpu_buffer->nr_pages + 1; i++)
> 5946			virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
  5947	
  5948		kfree(cpu_buffer->page_ids);
  5949		cpu_buffer->page_ids = NULL;
  5950	}
  5951
kernel test robot July 29, 2023, 3:44 a.m. UTC | #2
Hi Vincent,

kernel test robot noticed the following build warnings:

[auto build test WARNING on linus/master]
[cannot apply to rostedt-trace/for-next rostedt-trace/for-next-urgent]
[If your patch is applied to the wrong git tree, kindly drop us a note.
And when submitting patch, we suggest to use '--base' as documented in
https://git-scm.com/docs/git-format-patch#_base_tree_information]

url:    https://github.com/intel-lab-lkp/linux/commits/Vincent-Donnefort/ring-buffer-Introducing-ring-buffer-mapping-functions/20230729-005300
base:   linus/master
patch link:    https://lore.kernel.org/r/20230728164754.460767-2-vdonnefort%40google.com
patch subject: [PATCH v5 1/2] ring-buffer: Introducing ring-buffer mapping functions
config: arm-randconfig-r046-20230728 (https://download.01.org/0day-ci/archive/20230729/202307291143.HTPVZOsb-lkp@intel.com/config)
compiler: arm-linux-gnueabi-gcc (GCC) 12.3.0
reproduce: (https://download.01.org/0day-ci/archive/20230729/202307291143.HTPVZOsb-lkp@intel.com/reproduce)

If you fix the issue in a separate patch/commit (i.e. not just a new version of
the same patch/commit), kindly add following tags
| Reported-by: kernel test robot <lkp@intel.com>
| Closes: https://lore.kernel.org/oe-kbuild-all/202307291143.HTPVZOsb-lkp@intel.com/

All warnings (new ones prefixed by >>):

   In file included from arch/arm/include/asm/page.h:193,
                    from arch/arm/include/asm/thread_info.h:14,
                    from include/linux/thread_info.h:60,
                    from include/asm-generic/preempt.h:5,
                    from ./arch/arm/include/generated/asm/preempt.h:1,
                    from include/linux/preempt.h:79,
                    from include/linux/percpu.h:6,
                    from include/linux/context_tracking_state.h:5,
                    from include/linux/hardirq.h:5,
                    from include/linux/interrupt.h:11,
                    from include/linux/trace_recursion.h:5,
                    from kernel/trace/ring_buffer.c:7:
   kernel/trace/ring_buffer.c: In function 'rb_free_page_ids':
>> kernel/trace/ring_buffer.c:5946:50: warning: passing argument 1 of 'virt_to_pfn' makes pointer from integer without a cast [-Wint-conversion]
    5946 |                 virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
         |                              ~~~~~~~~~~~~~~~~~~~~^~~
         |                                                  |
         |                                                  long unsigned int
   include/asm-generic/memory_model.h:18:46: note: in definition of macro '__pfn_to_page'
      18 | #define __pfn_to_page(pfn)      (mem_map + ((pfn) - ARCH_PFN_OFFSET))
         |                                              ^~~
   kernel/trace/ring_buffer.c:5946:17: note: in expansion of macro 'virt_to_page'
    5946 |                 virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
         |                 ^~~~~~~~~~~~
   In file included from arch/arm/include/asm/page.h:188:
   arch/arm/include/asm/memory.h:296:53: note: expected 'const void *' but argument is of type 'long unsigned int'
     296 | static inline unsigned long virt_to_pfn(const void *p)
         |                                         ~~~~~~~~~~~~^
   kernel/trace/ring_buffer.c: In function 'rb_free_meta_page':
   kernel/trace/ring_buffer.c:5968:22: warning: passing argument 1 of 'virt_to_pfn' makes pointer from integer without a cast [-Wint-conversion]
    5968 |         virt_to_page(addr)->mapping = NULL;
         |                      ^~~~
         |                      |
         |                      long unsigned int
   include/asm-generic/memory_model.h:18:46: note: in definition of macro '__pfn_to_page'
      18 | #define __pfn_to_page(pfn)      (mem_map + ((pfn) - ARCH_PFN_OFFSET))
         |                                              ^~~
   kernel/trace/ring_buffer.c:5968:9: note: in expansion of macro 'virt_to_page'
    5968 |         virt_to_page(addr)->mapping = NULL;
         |         ^~~~~~~~~~~~
   arch/arm/include/asm/memory.h:296:53: note: expected 'const void *' but argument is of type 'long unsigned int'
     296 | static inline unsigned long virt_to_pfn(const void *p)
         |                                         ~~~~~~~~~~~~^
   kernel/trace/ring_buffer.c: In function 'ring_buffer_map_fault':
   kernel/trace/ring_buffer.c:6156:49: warning: passing argument 1 of 'virt_to_pfn' makes pointer from integer without a cast [-Wint-conversion]
    6156 |         return virt_to_page(cpu_buffer->page_ids[pgoff]);
         |                             ~~~~~~~~~~~~~~~~~~~~^~~~~~~
         |                                                 |
         |                                                 long unsigned int
   include/asm-generic/memory_model.h:18:46: note: in definition of macro '__pfn_to_page'
      18 | #define __pfn_to_page(pfn)      (mem_map + ((pfn) - ARCH_PFN_OFFSET))
         |                                              ^~~
   kernel/trace/ring_buffer.c:6156:16: note: in expansion of macro 'virt_to_page'
    6156 |         return virt_to_page(cpu_buffer->page_ids[pgoff]);
         |                ^~~~~~~~~~~~
   arch/arm/include/asm/memory.h:296:53: note: expected 'const void *' but argument is of type 'long unsigned int'
     296 | static inline unsigned long virt_to_pfn(const void *p)
         |                                         ~~~~~~~~~~~~^


vim +/virt_to_pfn +5946 kernel/trace/ring_buffer.c

  5940	
  5941	static void rb_free_page_ids(struct ring_buffer_per_cpu *cpu_buffer)
  5942	{
  5943		int i;
  5944	
  5945		for (i = 0; i < cpu_buffer->nr_pages + 1; i++)
> 5946			virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
  5947	
  5948		kfree(cpu_buffer->page_ids);
  5949		cpu_buffer->page_ids = NULL;
  5950	}
  5951
Steven Rostedt Aug. 1, 2023, 5:26 p.m. UTC | #3
On Fri, 28 Jul 2023 17:47:53 +0100
Vincent Donnefort <vdonnefort@google.com> wrote:

> In preparation for allowing the user-space to map a ring-buffer, add
> a set of mapping functions:
> 
>   ring_buffer_{map,unmap}()
>   ring_buffer_map_fault()
> 
> And controls on the ring-buffer:
> 
>   ring_buffer_map_get_reader_page()  /* swap reader and head */
> 
> Mapping the ring-buffer also involves:
> 
>   A unique ID for each page of the ring-buffer, as currently the pages
>   are only identified through their in-kernel VA.
> 
>   A meta-page, where are stored statistics about the ring-buffer and
>   a page IDs list, ordered. A field gives what page is the reader
>   one and one to gives where the ring-buffer starts in the list of data
>   pages.
> 
> The linear mapping exposes the meta-page, and each page of the
> ring-buffer, ordered following their unique ID, assigned during the
> first mapping.
> 
> Once mapped, no page can get in or out of the ring-buffer: the buffer
> size will remain unmodified and the splice enabling functions will in
> reality simply memcpy the data instead of swapping pages.

So I tested these, and they look good. But I have some comments still.

> 
> Signed-off-by: Vincent Donnefort <vdonnefort@google.com>
> 
> diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
> index 782e14f62201..980abfbd92ed 100644
> --- a/include/linux/ring_buffer.h
> +++ b/include/linux/ring_buffer.h
> @@ -6,6 +6,8 @@
>  #include <linux/seq_file.h>
>  #include <linux/poll.h>
>  
> +#include <uapi/linux/trace_mmap.h>
> +
>  struct trace_buffer;
>  struct ring_buffer_iter;
>  
> @@ -211,4 +213,9 @@ int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
>  #define trace_rb_cpu_prepare	NULL
>  #endif
>  
> +int ring_buffer_map(struct trace_buffer *buffer, int cpu);
> +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
> +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> +				   unsigned long pgoff);
> +int ring_buffer_map_get_reader_page(struct trace_buffer *buffer, int cpu);
>  #endif /* _LINUX_RING_BUFFER_H */
> diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
> new file mode 100644
> index 000000000000..653176cc50bc
> --- /dev/null
> +++ b/include/uapi/linux/trace_mmap.h
> @@ -0,0 +1,26 @@
> +/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
> +#ifndef _UAPI_TRACE_MMAP_H_
> +#define _UAPI_TRACE_MMAP_H_
> +
> +#include <linux/types.h>
> +
> +struct ring_buffer_meta {

To be consistent with the naming of the internal structure, let's call this:

 struct trace_buffer_meta {


> +	unsigned long	entries;
> +	unsigned long	overrun;
> +	unsigned long	read;
> +
> +	unsigned long	pages_touched;
> +	unsigned long	pages_lost;
> +	unsigned long	pages_read;
> +
> +	__u32		meta_page_size;

We still want this meta structure size exported. That way if we ever extend
the interface, the applications will know if the kernel supports it or not.

	__u32		meta_struct_len; ?


> +	__u32		nr_data_pages;	/* Number of pages in the ring-buffer */
> +
> +	struct reader_page {
> +		__u32	id;		/* Reader page ID from 0 to nr_data_pages - 1 */
> +		__u32	read;		/* Number of bytes read on the reader page */
> +		unsigned long	lost_events; /* Events lost at the time of the reader swap */
> +	} reader_page;

I think we should define the structure outside the other structure, and
also rename it as reader_page is too generic. Perhaps call it
trace_buffer_read_page.


> +};
> +
> +#endif /* _UAPI_TRACE_MMAP_H_ */
> diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
> index de061dd47313..8f367fd3dbdd 100644
> --- a/kernel/trace/ring_buffer.c
> +++ b/kernel/trace/ring_buffer.c
> @@ -332,6 +332,7 @@ struct buffer_page {
>  	local_t		 entries;	/* entries on this page */
>  	unsigned long	 real_end;	/* real end of data */
>  	struct buffer_data_page *page;	/* Actual data page */
> +	u32		 id;		/* ID for external mapping */

I noticed that we have a whole in the current structure for 64 bit systems:

struct buffer_page {
	struct list_head list;		16 bytes depending on arch
	local_t		 write;		4 bytes
	unsigned	 read;		4 bytes
	local_t		 entries;	4 bytes

[ 4 byte whole on 64 bit archs ]

	unsigned long	 real_end;	8 bytes
	struct buffer_data_page *page;	8 bytes
};

If we put the id after entries, it will fill that whole.

struct buffer_page {
	struct list_head list;		/* list of buffer pages */
	local_t		 write;		/* index for next write */
	unsigned	 read;		/* index for next read */
	local_t		 entries;	/* entries on this page */
	u32		 id;		/* ID for external mapping */
	unsigned long	 real_end;	/* real end of data */
	struct buffer_data_page *page;	/* Actual data page */
};


>  };
>  
>  /*
> @@ -523,6 +524,12 @@ struct ring_buffer_per_cpu {
>  	rb_time_t			before_stamp;
>  	u64				event_stamp[MAX_NEST];
>  	u64				read_stamp;
> +
> +	int				mapped;
> +	struct mutex			mapping_lock;
> +	unsigned long			*page_ids;	/* ID to addr */
> +	struct ring_buffer_meta		*meta_page;
> +
>  	/* ring buffer pages to update, > 0 to add, < 0 to remove */
>  	long				nr_pages_to_update;
>  	struct list_head		new_pages; /* new pages to add */
> @@ -1562,6 +1569,13 @@ static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
>  		/* Again, either we update tail_page or an interrupt does */
>  		(void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
>  	}
> +
> +	if (READ_ONCE(cpu_buffer->mapped)) {
> +		/* Ensure the meta_page is ready */
> +		smp_rmb();
> +		WRITE_ONCE(cpu_buffer->meta_page->pages_touched,
> +			   local_read(&cpu_buffer->pages_touched));
> +	}

I was thinking instead of doing this in the semi fast path, put this logic
into the rb_wakeup_waiters() code. That is, if a task is mapped, we call
the irq_work() to do this for us. It could even do more, like handle
blocked mapped waiters.

>  }
>  
>  static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
> @@ -1725,6 +1739,7 @@ rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
>  	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
>  	init_waitqueue_head(&cpu_buffer->irq_work.waiters);
>  	init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
> +	mutex_init(&cpu_buffer->mapping_lock);
>  
>  	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
>  			    GFP_KERNEL, cpu_to_node(cpu));
> @@ -2521,6 +2536,15 @@ rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
>  		local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
>  		local_inc(&cpu_buffer->pages_lost);
>  
> +		if (READ_ONCE(cpu_buffer->mapped)) {
> +			/* Ensure the meta_page is ready */
> +			smp_rmb();
> +			WRITE_ONCE(cpu_buffer->meta_page->overrun,
> +				   local_read(&cpu_buffer->overrun));
> +			WRITE_ONCE(cpu_buffer->meta_page->pages_lost,
> +				   local_read(&cpu_buffer->pages_lost));
> +		}
> +

Perhaps this too could be handled in the irq work?

>  		/*
>  		 * The entries will be zeroed out when we move the
>  		 * tail page.
> @@ -3183,6 +3207,14 @@ static inline void rb_event_discard(struct ring_buffer_event *event)
>  static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer)
>  {
>  	local_inc(&cpu_buffer->entries);
> +
> +	if (READ_ONCE(cpu_buffer->mapped)) {
> +		/* Ensure the meta_page is ready */
> +		smp_rmb();
> +		WRITE_ONCE(cpu_buffer->meta_page->entries,
> +			   local_read(&cpu_buffer->entries));
> +	}

As well as this.

In other words, since the irq_work will trigger when something is waiting
for it, it could handle all the updates.

> +
>  	rb_end_commit(cpu_buffer);
>  }
>  
> @@ -3486,7 +3518,7 @@ static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
>  		return;
>  
>  	/*
> -	 * If this interrupted another event, 
> +	 * If this interrupted another event,
>  	 */
>  	if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
>  		goto out;
> @@ -4658,6 +4690,13 @@ rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
>  		cpu_buffer->last_overrun = overwrite;
>  	}
>  
> +	if (cpu_buffer->mapped) {
> +		WRITE_ONCE(cpu_buffer->meta_page->reader_page.read, 0);
> +		WRITE_ONCE(cpu_buffer->meta_page->reader_page.id, reader->id);
> +		WRITE_ONCE(cpu_buffer->meta_page->reader_page.lost_events, cpu_buffer->lost_events);
> +		WRITE_ONCE(cpu_buffer->meta_page->pages_read, local_read(&cpu_buffer->pages_read));
> +	}
> +
>  	goto again;
>  
>   out:
> @@ -4724,6 +4763,13 @@ static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
>  
>  	length = rb_event_length(event);
>  	cpu_buffer->reader_page->read += length;
> +
> +	if (cpu_buffer->mapped) {
> +		WRITE_ONCE(cpu_buffer->meta_page->reader_page.read,
> +			   cpu_buffer->reader_page->read);
> +		WRITE_ONCE(cpu_buffer->meta_page->read,
> +			   cpu_buffer->read);
> +	}
>  }
>  
>  static void rb_advance_iter(struct ring_buffer_iter *iter)
> @@ -5253,6 +5299,19 @@ static void rb_clear_buffer_page(struct buffer_page *page)
>  	page->read = 0;
>  }
>  
> +static void rb_reset_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> +	struct ring_buffer_meta *meta = cpu_buffer->meta_page;
> +
> +	WRITE_ONCE(meta->entries, 0);
> +	WRITE_ONCE(meta->overrun, 0);
> +	WRITE_ONCE(meta->read, cpu_buffer->read);
> +	WRITE_ONCE(meta->pages_touched, 0);
> +	WRITE_ONCE(meta->pages_lost, 0);
> +	WRITE_ONCE(meta->pages_read, local_read(&cpu_buffer->pages_read));
> +	WRITE_ONCE(meta->reader_page.read, cpu_buffer->reader_page->read);
> +}
> +
>  static void
>  rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
>  {
> @@ -5297,6 +5356,9 @@ rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
>  	cpu_buffer->lost_events = 0;
>  	cpu_buffer->last_overrun = 0;
>  
> +	if (cpu_buffer->mapped)
> +		rb_reset_meta_page(cpu_buffer);
> +
>  	rb_head_page_activate(cpu_buffer);
>  }
>  
> @@ -5511,6 +5573,11 @@ int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
>  	cpu_buffer_a = buffer_a->buffers[cpu];
>  	cpu_buffer_b = buffer_b->buffers[cpu];
>  
> +	if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
> +		ret = -EBUSY;
> +		goto out;
> +	}
> +
>  	/* At least make sure the two buffers are somewhat the same */
>  	if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
>  		goto out;
> @@ -5753,7 +5820,8 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
>  	 * Otherwise, we can simply swap the page with the one passed in.
>  	 */
>  	if (read || (len < (commit - read)) ||
> -	    cpu_buffer->reader_page == cpu_buffer->commit_page) {
> +	    cpu_buffer->reader_page == cpu_buffer->commit_page ||
> +	    cpu_buffer->mapped) {
>  		struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
>  		unsigned int rpos = read;
>  		unsigned int pos = 0;
> @@ -5870,6 +5938,255 @@ int ring_buffer_read_page(struct trace_buffer *buffer,
>  }
>  EXPORT_SYMBOL_GPL(ring_buffer_read_page);
>  
> +static void rb_free_page_ids(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> +	int i;
> +
> +	for (i = 0; i < cpu_buffer->nr_pages + 1; i++)
> +		virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
> +
> +	kfree(cpu_buffer->page_ids);
> +	cpu_buffer->page_ids = NULL;
> +}
> +
> +static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> +	if (cpu_buffer->meta_page)
> +		return 0;
> +
> +	cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER));
> +	if (!cpu_buffer->meta_page)
> +		return -ENOMEM;
> +
> +	return 0;
> +}
> +
> +static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> +	unsigned long addr = (unsigned long)cpu_buffer->meta_page;
> +
> +	virt_to_page(addr)->mapping = NULL;
> +	free_page(addr);
> +	cpu_buffer->meta_page = NULL;
> +}
> +
> +static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
> +				   unsigned long *page_ids)
> +{
> +	struct ring_buffer_meta *meta = cpu_buffer->meta_page;
> +	unsigned int nr_data_pages = cpu_buffer->nr_pages + 1;
> +	struct buffer_page *first_page, *bpage;
> +	int id = 0;
> +
> +	page_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
> +	cpu_buffer->reader_page->id = id++;
> +
> +	first_page = bpage = rb_set_head_page(cpu_buffer);
> +	do {
> +		if (id >= nr_data_pages) {
> +			WARN_ON(1);
> +			break;
> +		}
> +
> +		page_ids[id] = (unsigned long)bpage->page;
> +		bpage->id = id;
> +
> +		rb_inc_page(&bpage);
> +		id++;
> +	} while (bpage != first_page);
> +
> +	/* install page ID to kern VA translation */
> +	cpu_buffer->page_ids = page_ids;
> +
> +	meta->meta_page_size = PAGE_SIZE;
> +	meta->nr_data_pages = nr_data_pages;
> +	meta->reader_page.id = cpu_buffer->reader_page->id;
> +	rb_reset_meta_page(cpu_buffer);
> +}
> +
> +static inline struct ring_buffer_per_cpu *
> +rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
> +{
> +	struct ring_buffer_per_cpu *cpu_buffer;
> +
> +	if (!cpumask_test_cpu(cpu, buffer->cpumask))
> +		return ERR_PTR(-EINVAL);
> +
> +	cpu_buffer = buffer->buffers[cpu];
> +
> +	mutex_lock(&cpu_buffer->mapping_lock);
> +
> +	if (!cpu_buffer->mapped) {
> +		mutex_unlock(&cpu_buffer->mapping_lock);
> +		return ERR_PTR(-ENODEV);
> +	}
> +
> +	return cpu_buffer;
> +}
> +
> +static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
> +{
> +	mutex_unlock(&cpu_buffer->mapping_lock);
> +}
> +
> +int ring_buffer_map(struct trace_buffer *buffer, int cpu)
> +{
> +	struct ring_buffer_per_cpu *cpu_buffer;
> +	unsigned long flags, *page_ids;
> +	int err = 0;
> +
> +	if (!cpumask_test_cpu(cpu, buffer->cpumask))
> +		return -EINVAL;
> +
> +	cpu_buffer = buffer->buffers[cpu];
> +
> +	mutex_lock(&cpu_buffer->mapping_lock);
> +
> +	if (cpu_buffer->mapped) {
> +		WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
> +		goto unlock;
> +	}
> +
> +	/* prevent another thread from changing buffer sizes */
> +	mutex_lock(&buffer->mutex);
> +	atomic_inc(&cpu_buffer->resize_disabled);
> +	mutex_unlock(&buffer->mutex);
> +
> +	err = rb_alloc_meta_page(cpu_buffer);
> +	if (err) {
> +		atomic_dec(&cpu_buffer->resize_disabled);
> +		goto unlock;
> +	}
> +
> +	/* page_ids include the reader page while nr_pages does not */
> +	page_ids = kzalloc(sizeof(*page_ids) * (cpu_buffer->nr_pages + 1),
> +			   GFP_KERNEL);
> +	if (!page_ids) {
> +		rb_free_meta_page(cpu_buffer);
> +		atomic_dec(&cpu_buffer->resize_disabled);
> +		err = -ENOMEM;
> +		goto unlock;
> +	}
> +
> +	/*
> +	 * Lock all readers to block any page swap until the page IDs are
> +	 * assigned.
> +	 */
> +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> +
> +	rb_setup_ids_meta_page(cpu_buffer, page_ids);
> +	/*
> +	 * Ensure the writer will observe the meta-page before
> +	 * cpu_buffer->mapped.
> +	 */
> +	smp_wmb();
> +	WRITE_ONCE(cpu_buffer->mapped, 1);
> +
> +	/* Init meta_page values unless the writer did it already */
> +	cmpxchg(&cpu_buffer->meta_page->entries, 0,
> +		local_read(&cpu_buffer->entries));
> +	cmpxchg(&cpu_buffer->meta_page->overrun, 0,
> +		local_read(&cpu_buffer->overrun));
> +	cmpxchg(&cpu_buffer->meta_page->pages_touched, 0,
> +		local_read(&cpu_buffer->pages_touched));
> +	cmpxchg(&cpu_buffer->meta_page->pages_lost, 0,
> +		local_read(&cpu_buffer->pages_lost));
> +
> +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> +unlock:
> +	mutex_unlock(&cpu_buffer->mapping_lock);
> +
> +	return err;
> +}
> +
> +int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
> +{
> +	struct ring_buffer_per_cpu *cpu_buffer;
> +	int err = 0;
> +
> +	if (!cpumask_test_cpu(cpu, buffer->cpumask))
> +		return -EINVAL;
> +
> +	cpu_buffer = buffer->buffers[cpu];
> +
> +	mutex_lock(&cpu_buffer->mapping_lock);
> +
> +	if (!cpu_buffer->mapped) {
> +		err = -ENODEV;
> +		goto unlock;
> +	}
> +
> +	WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
> +	if (!cpu_buffer->mapped) {
> +		/* Wait the writer and readers to observe !mapped */
> +		synchronize_rcu();
> +
> +		rb_free_page_ids(cpu_buffer);
> +		rb_free_meta_page(cpu_buffer);
> +		atomic_dec(&cpu_buffer->resize_disabled);
> +	}
> +
> +unlock:
> +	mutex_unlock(&cpu_buffer->mapping_lock);
> +
> +	return err;
> +}
> +
> +/*
> + *   +--------------+
> + *   |   meta page  |  pgoff=0
> + *   +--------------+
> + *   |  data page1  |  page_ids=0
> + *   +--------------+
> + *   |  data page2  |  page_ids=1
> + *         ...
> + */
> +struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
> +				   unsigned long pgoff)
> +{
> +	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
> +
> +	if (!pgoff)
> +		return virt_to_page((void *)cpu_buffer->meta_page);
> +
> +	pgoff--;
> +	if (pgoff > cpu_buffer->nr_pages)
> +		return NULL;
> +
> +	return virt_to_page(cpu_buffer->page_ids[pgoff]);
> +}
> +
> +int ring_buffer_map_get_reader_page(struct trace_buffer *buffer, int cpu)
> +{
> +	struct ring_buffer_per_cpu *cpu_buffer;
> +	unsigned long reader_size, flags;

Please put variable declarations on separate lines.

	unsigned long reader_size;
	unsigned long flags;

> +
> +	cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
> +	if (IS_ERR(cpu_buffer))
> +		return (int)PTR_ERR(cpu_buffer);
> +
> +	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
> +consume:
> +	if (rb_per_cpu_empty(cpu_buffer))
> +		goto out;
> +	reader_size = rb_page_size(cpu_buffer->reader_page);
> +	if (cpu_buffer->reader_page->read < reader_size) {

Please add a comment to what is going on here. I'm assuming that this is to
finish reading the reader page?

> +		while (cpu_buffer->reader_page->read < reader_size)
> +			rb_advance_reader(cpu_buffer);
> +		goto out;
> +	}
> +
> +	if (WARN_ON(!rb_get_reader_page(cpu_buffer)))
> +		goto out;
> +
> +	goto consume;
> +out:
> +	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
> +	rb_put_mapped_buffer(cpu_buffer);
> +
> +	return 0;
> +}
> +
>  /*
>   * We only allocate new buffers, never free them if the CPU goes down.
>   * If we were to free the buffer, then the user would lose any trace that was in

Thanks,

-- Steve
Steven Rostedt Aug. 2, 2023, 11:45 a.m. UTC | #4
On Tue, 1 Aug 2023 13:26:03 -0400
Steven Rostedt <rostedt@goodmis.org> wrote:

> > +
> > +	if (READ_ONCE(cpu_buffer->mapped)) {
> > +		/* Ensure the meta_page is ready */
> > +		smp_rmb();
> > +		WRITE_ONCE(cpu_buffer->meta_page->pages_touched,
> > +			   local_read(&cpu_buffer->pages_touched));
> > +	}  
> 
> I was thinking instead of doing this in the semi fast path, put this logic
> into the rb_wakeup_waiters() code. That is, if a task is mapped, we call
> the irq_work() to do this for us. It could even do more, like handle
> blocked mapped waiters.

I was thinking how to implement this, and I worry that it may cause an irq
storm. Let's keep this (and the other locations) as is, where we do the
updates in place. Then we can look at seeing if it is possible to do it in
a delayed fashion another time.

-- Steve
Vincent Donnefort Aug. 2, 2023, 12:30 p.m. UTC | #5
On Wed, Aug 02, 2023 at 07:45:26AM -0400, Steven Rostedt wrote:
> On Tue, 1 Aug 2023 13:26:03 -0400
> Steven Rostedt <rostedt@goodmis.org> wrote:
> 
> > > +
> > > +	if (READ_ONCE(cpu_buffer->mapped)) {
> > > +		/* Ensure the meta_page is ready */
> > > +		smp_rmb();
> > > +		WRITE_ONCE(cpu_buffer->meta_page->pages_touched,
> > > +			   local_read(&cpu_buffer->pages_touched));
> > > +	}  
> > 
> > I was thinking instead of doing this in the semi fast path, put this logic
> > into the rb_wakeup_waiters() code. That is, if a task is mapped, we call
> > the irq_work() to do this for us. It could even do more, like handle
> > blocked mapped waiters.
> 
> I was thinking how to implement this, and I worry that it may cause an irq
> storm. Let's keep this (and the other locations) as is, where we do the
> updates in place. Then we can look at seeing if it is possible to do it in
> a delayed fashion another time.

I actually looking at this. How about:

On the userspace side, a simple poll:

  static void wait_entries(int fd)
  {
          struct pollfd pollfd = {
                  .fd     = fd,
                  .events = POLLIN,
          };
  
          if (poll(&pollfd, 1, -1) == -1)
                  pdie("poll");
  }

And on the kernel side, just a function to update the "writer fields" of the
meta-page:

   static void rb_wake_up_waiters(struct irq_work *work)
   {
          struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
  +       struct ring_buffer_per_cpu *cpu_buffer =
  +               container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
  +
  +       rb_update_meta_page(cpu_buffer);
   
          wake_up_all(&rbwork->waiters);

That would rate limit the number of updates to the meta-page without any irq storm?

> 
> -- Steve
Steven Rostedt Aug. 2, 2023, 3:13 p.m. UTC | #6
On Wed, 2 Aug 2023 13:30:56 +0100
Vincent Donnefort <vdonnefort@google.com> wrote:

> On Wed, Aug 02, 2023 at 07:45:26AM -0400, Steven Rostedt wrote:
> > On Tue, 1 Aug 2023 13:26:03 -0400
> > Steven Rostedt <rostedt@goodmis.org> wrote:
> >   
> > > > +
> > > > +	if (READ_ONCE(cpu_buffer->mapped)) {
> > > > +		/* Ensure the meta_page is ready */
> > > > +		smp_rmb();
> > > > +		WRITE_ONCE(cpu_buffer->meta_page->pages_touched,
> > > > +			   local_read(&cpu_buffer->pages_touched));
> > > > +	}    
> > > 
> > > I was thinking instead of doing this in the semi fast path, put this logic
> > > into the rb_wakeup_waiters() code. That is, if a task is mapped, we call
> > > the irq_work() to do this for us. It could even do more, like handle
> > > blocked mapped waiters.  
> > 
> > I was thinking how to implement this, and I worry that it may cause an irq
> > storm. Let's keep this (and the other locations) as is, where we do the
> > updates in place. Then we can look at seeing if it is possible to do it in
> > a delayed fashion another time.  
> 
> I actually looking at this. How about:
> 
> On the userspace side, a simple poll:
> 
>   static void wait_entries(int fd)
>   {
>           struct pollfd pollfd = {
>                   .fd     = fd,
>                   .events = POLLIN,
>           };
>   
>           if (poll(&pollfd, 1, -1) == -1)
>                   pdie("poll");
>   }
> 
> And on the kernel side, just a function to update the "writer fields" of the
> meta-page:
> 
>    static void rb_wake_up_waiters(struct irq_work *work)
>    {
>           struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
>   +       struct ring_buffer_per_cpu *cpu_buffer =
>   +               container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
>   +
>   +       rb_update_meta_page(cpu_buffer);
>    
>           wake_up_all(&rbwork->waiters);
> 
> That would rate limit the number of updates to the meta-page without any irq storm?
> 

Is poll an issue? It requires user space to do a system call to see if
there's more data? But I guess that's not too much of an issue, as it needs
to do the ioctl to get the reader page.

We could also add an option to the ioctl to block, or have the ioctl honor
the NON_BLOCK flags of the fd?

-- Steve
Vincent Donnefort Aug. 3, 2023, 10:33 a.m. UTC | #7
[...]

> > And on the kernel side, just a function to update the "writer fields" of the
> > meta-page:
> > 
> >    static void rb_wake_up_waiters(struct irq_work *work)
> >    {
> >           struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
> >   +       struct ring_buffer_per_cpu *cpu_buffer =
> >   +               container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
> >   +
> >   +       rb_update_meta_page(cpu_buffer);
> >    
> >           wake_up_all(&rbwork->waiters);
> > 
> > That would rate limit the number of updates to the meta-page without any irq storm?
> > 
> 
> Is poll an issue? It requires user space to do a system call to see if
> there's more data? But I guess that's not too much of an issue, as it needs
> to do the ioctl to get the reader page.

I don't think there's any problem with this approach, beside the extra system
call...

> 
> We could also add an option to the ioctl to block, or have the ioctl honor
> the NON_BLOCK flags of the fd?

... but indeed, we could block there. The userspace interface would be even simpler.
How about?

  +++ b/kernel/trace/trace.c
  @@ -8499,12 +8499,22 @@ static long tracing_buffers_ioctl(struct file *file, unsigned int cmd, unsigned
   {
          struct ftrace_buffer_info *info = file->private_data;
          struct trace_iterator *iter = &info->iter;
  +       int err;
  +
  +       if (cmd == TRACE_MMAP_IOCTL_GET_READER_PAGE) {
  +               if (!(file->f_flags & O_NONBLOCK)) {
  +                       err = ring_buffer_wait(iter->array_buffer->buffer,
  +                                              iter->cpu_file,
  +                                              iter->tr->buffer_percent);
  +                       if (err)
  +                               return err;
  +               }
   
  -       if (cmd == TRACE_MMAP_IOCTL_GET_READER_PAGE)
                  return ring_buffer_map_get_reader_page(iter->array_buffer->buffer,
                                                         iter->cpu_file);

> 
> -- Steve
> 
> -- 
> To unsubscribe from this group and stop receiving emails from it, send an email to kernel-team+unsubscribe@android.com.
>
Steven Rostedt Aug. 3, 2023, 2:52 p.m. UTC | #8
On Thu, 3 Aug 2023 11:33:39 +0100
Vincent Donnefort <vdonnefort@google.com> wrote:

> [...]
> 
> > > And on the kernel side, just a function to update the "writer fields" of the
> > > meta-page:
> > > 
> > >    static void rb_wake_up_waiters(struct irq_work *work)
> > >    {
> > >           struct rb_irq_work *rbwork = container_of(work, struct rb_irq_work, work);
> > >   +       struct ring_buffer_per_cpu *cpu_buffer =
> > >   +               container_of(rbwork, struct ring_buffer_per_cpu, irq_work);
> > >   +
> > >   +       rb_update_meta_page(cpu_buffer);
> > >    
> > >           wake_up_all(&rbwork->waiters);
> > > 
> > > That would rate limit the number of updates to the meta-page without any irq storm?
> > >   
> > 
> > Is poll an issue? It requires user space to do a system call to see if
> > there's more data? But I guess that's not too much of an issue, as it needs
> > to do the ioctl to get the reader page.  
> 
> I don't think there's any problem with this approach, beside the extra system
> call...
> 
> > 
> > We could also add an option to the ioctl to block, or have the ioctl honor
> > the NON_BLOCK flags of the fd?  
> 
> ... but indeed, we could block there. The userspace interface would be even simpler.
> How about?
> 
>   +++ b/kernel/trace/trace.c
>   @@ -8499,12 +8499,22 @@ static long tracing_buffers_ioctl(struct file *file, unsigned int cmd, unsigned
>    {
>           struct ftrace_buffer_info *info = file->private_data;
>           struct trace_iterator *iter = &info->iter;
>   +       int err;
>   +
>   +       if (cmd == TRACE_MMAP_IOCTL_GET_READER_PAGE) {
>   +               if (!(file->f_flags & O_NONBLOCK)) {
>   +                       err = ring_buffer_wait(iter->array_buffer->buffer,
>   +                                              iter->cpu_file,
>   +                                              iter->tr->buffer_percent);
>   +                       if (err)
>   +                               return err;
>   +               }
>    
>   -       if (cmd == TRACE_MMAP_IOCTL_GET_READER_PAGE)
>                   return ring_buffer_map_get_reader_page(iter->array_buffer->buffer,
>                                                          iter->cpu_file);
> 

Looks good to me.

-- Steve
diff mbox series

Patch

diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index 782e14f62201..980abfbd92ed 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -6,6 +6,8 @@ 
 #include <linux/seq_file.h>
 #include <linux/poll.h>
 
+#include <uapi/linux/trace_mmap.h>
+
 struct trace_buffer;
 struct ring_buffer_iter;
 
@@ -211,4 +213,9 @@  int trace_rb_cpu_prepare(unsigned int cpu, struct hlist_node *node);
 #define trace_rb_cpu_prepare	NULL
 #endif
 
+int ring_buffer_map(struct trace_buffer *buffer, int cpu);
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+				   unsigned long pgoff);
+int ring_buffer_map_get_reader_page(struct trace_buffer *buffer, int cpu);
 #endif /* _LINUX_RING_BUFFER_H */
diff --git a/include/uapi/linux/trace_mmap.h b/include/uapi/linux/trace_mmap.h
new file mode 100644
index 000000000000..653176cc50bc
--- /dev/null
+++ b/include/uapi/linux/trace_mmap.h
@@ -0,0 +1,26 @@ 
+/* SPDX-License-Identifier: GPL-2.0 WITH Linux-syscall-note */
+#ifndef _UAPI_TRACE_MMAP_H_
+#define _UAPI_TRACE_MMAP_H_
+
+#include <linux/types.h>
+
+struct ring_buffer_meta {
+	unsigned long	entries;
+	unsigned long	overrun;
+	unsigned long	read;
+
+	unsigned long	pages_touched;
+	unsigned long	pages_lost;
+	unsigned long	pages_read;
+
+	__u32		meta_page_size;
+	__u32		nr_data_pages;	/* Number of pages in the ring-buffer */
+
+	struct reader_page {
+		__u32	id;		/* Reader page ID from 0 to nr_data_pages - 1 */
+		__u32	read;		/* Number of bytes read on the reader page */
+		unsigned long	lost_events; /* Events lost at the time of the reader swap */
+	} reader_page;
+};
+
+#endif /* _UAPI_TRACE_MMAP_H_ */
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index de061dd47313..8f367fd3dbdd 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -332,6 +332,7 @@  struct buffer_page {
 	local_t		 entries;	/* entries on this page */
 	unsigned long	 real_end;	/* real end of data */
 	struct buffer_data_page *page;	/* Actual data page */
+	u32		 id;		/* ID for external mapping */
 };
 
 /*
@@ -523,6 +524,12 @@  struct ring_buffer_per_cpu {
 	rb_time_t			before_stamp;
 	u64				event_stamp[MAX_NEST];
 	u64				read_stamp;
+
+	int				mapped;
+	struct mutex			mapping_lock;
+	unsigned long			*page_ids;	/* ID to addr */
+	struct ring_buffer_meta		*meta_page;
+
 	/* ring buffer pages to update, > 0 to add, < 0 to remove */
 	long				nr_pages_to_update;
 	struct list_head		new_pages; /* new pages to add */
@@ -1562,6 +1569,13 @@  static void rb_tail_page_update(struct ring_buffer_per_cpu *cpu_buffer,
 		/* Again, either we update tail_page or an interrupt does */
 		(void)cmpxchg(&cpu_buffer->tail_page, tail_page, next_page);
 	}
+
+	if (READ_ONCE(cpu_buffer->mapped)) {
+		/* Ensure the meta_page is ready */
+		smp_rmb();
+		WRITE_ONCE(cpu_buffer->meta_page->pages_touched,
+			   local_read(&cpu_buffer->pages_touched));
+	}
 }
 
 static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
@@ -1725,6 +1739,7 @@  rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
 	init_irq_work(&cpu_buffer->irq_work.work, rb_wake_up_waiters);
 	init_waitqueue_head(&cpu_buffer->irq_work.waiters);
 	init_waitqueue_head(&cpu_buffer->irq_work.full_waiters);
+	mutex_init(&cpu_buffer->mapping_lock);
 
 	bpage = kzalloc_node(ALIGN(sizeof(*bpage), cache_line_size()),
 			    GFP_KERNEL, cpu_to_node(cpu));
@@ -2521,6 +2536,15 @@  rb_handle_head_page(struct ring_buffer_per_cpu *cpu_buffer,
 		local_sub(BUF_PAGE_SIZE, &cpu_buffer->entries_bytes);
 		local_inc(&cpu_buffer->pages_lost);
 
+		if (READ_ONCE(cpu_buffer->mapped)) {
+			/* Ensure the meta_page is ready */
+			smp_rmb();
+			WRITE_ONCE(cpu_buffer->meta_page->overrun,
+				   local_read(&cpu_buffer->overrun));
+			WRITE_ONCE(cpu_buffer->meta_page->pages_lost,
+				   local_read(&cpu_buffer->pages_lost));
+		}
+
 		/*
 		 * The entries will be zeroed out when we move the
 		 * tail page.
@@ -3183,6 +3207,14 @@  static inline void rb_event_discard(struct ring_buffer_event *event)
 static void rb_commit(struct ring_buffer_per_cpu *cpu_buffer)
 {
 	local_inc(&cpu_buffer->entries);
+
+	if (READ_ONCE(cpu_buffer->mapped)) {
+		/* Ensure the meta_page is ready */
+		smp_rmb();
+		WRITE_ONCE(cpu_buffer->meta_page->entries,
+			   local_read(&cpu_buffer->entries));
+	}
+
 	rb_end_commit(cpu_buffer);
 }
 
@@ -3486,7 +3518,7 @@  static void check_buffer(struct ring_buffer_per_cpu *cpu_buffer,
 		return;
 
 	/*
-	 * If this interrupted another event, 
+	 * If this interrupted another event,
 	 */
 	if (atomic_inc_return(this_cpu_ptr(&checking)) != 1)
 		goto out;
@@ -4658,6 +4690,13 @@  rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
 		cpu_buffer->last_overrun = overwrite;
 	}
 
+	if (cpu_buffer->mapped) {
+		WRITE_ONCE(cpu_buffer->meta_page->reader_page.read, 0);
+		WRITE_ONCE(cpu_buffer->meta_page->reader_page.id, reader->id);
+		WRITE_ONCE(cpu_buffer->meta_page->reader_page.lost_events, cpu_buffer->lost_events);
+		WRITE_ONCE(cpu_buffer->meta_page->pages_read, local_read(&cpu_buffer->pages_read));
+	}
+
 	goto again;
 
  out:
@@ -4724,6 +4763,13 @@  static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
 
 	length = rb_event_length(event);
 	cpu_buffer->reader_page->read += length;
+
+	if (cpu_buffer->mapped) {
+		WRITE_ONCE(cpu_buffer->meta_page->reader_page.read,
+			   cpu_buffer->reader_page->read);
+		WRITE_ONCE(cpu_buffer->meta_page->read,
+			   cpu_buffer->read);
+	}
 }
 
 static void rb_advance_iter(struct ring_buffer_iter *iter)
@@ -5253,6 +5299,19 @@  static void rb_clear_buffer_page(struct buffer_page *page)
 	page->read = 0;
 }
 
+static void rb_reset_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	struct ring_buffer_meta *meta = cpu_buffer->meta_page;
+
+	WRITE_ONCE(meta->entries, 0);
+	WRITE_ONCE(meta->overrun, 0);
+	WRITE_ONCE(meta->read, cpu_buffer->read);
+	WRITE_ONCE(meta->pages_touched, 0);
+	WRITE_ONCE(meta->pages_lost, 0);
+	WRITE_ONCE(meta->pages_read, local_read(&cpu_buffer->pages_read));
+	WRITE_ONCE(meta->reader_page.read, cpu_buffer->reader_page->read);
+}
+
 static void
 rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 {
@@ -5297,6 +5356,9 @@  rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 	cpu_buffer->lost_events = 0;
 	cpu_buffer->last_overrun = 0;
 
+	if (cpu_buffer->mapped)
+		rb_reset_meta_page(cpu_buffer);
+
 	rb_head_page_activate(cpu_buffer);
 }
 
@@ -5511,6 +5573,11 @@  int ring_buffer_swap_cpu(struct trace_buffer *buffer_a,
 	cpu_buffer_a = buffer_a->buffers[cpu];
 	cpu_buffer_b = buffer_b->buffers[cpu];
 
+	if (READ_ONCE(cpu_buffer_a->mapped) || READ_ONCE(cpu_buffer_b->mapped)) {
+		ret = -EBUSY;
+		goto out;
+	}
+
 	/* At least make sure the two buffers are somewhat the same */
 	if (cpu_buffer_a->nr_pages != cpu_buffer_b->nr_pages)
 		goto out;
@@ -5753,7 +5820,8 @@  int ring_buffer_read_page(struct trace_buffer *buffer,
 	 * Otherwise, we can simply swap the page with the one passed in.
 	 */
 	if (read || (len < (commit - read)) ||
-	    cpu_buffer->reader_page == cpu_buffer->commit_page) {
+	    cpu_buffer->reader_page == cpu_buffer->commit_page ||
+	    cpu_buffer->mapped) {
 		struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
 		unsigned int rpos = read;
 		unsigned int pos = 0;
@@ -5870,6 +5938,255 @@  int ring_buffer_read_page(struct trace_buffer *buffer,
 }
 EXPORT_SYMBOL_GPL(ring_buffer_read_page);
 
+static void rb_free_page_ids(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	int i;
+
+	for (i = 0; i < cpu_buffer->nr_pages + 1; i++)
+		virt_to_page(cpu_buffer->page_ids[i])->mapping = NULL;
+
+	kfree(cpu_buffer->page_ids);
+	cpu_buffer->page_ids = NULL;
+}
+
+static int rb_alloc_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	if (cpu_buffer->meta_page)
+		return 0;
+
+	cpu_buffer->meta_page = page_to_virt(alloc_page(GFP_USER));
+	if (!cpu_buffer->meta_page)
+		return -ENOMEM;
+
+	return 0;
+}
+
+static void rb_free_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	unsigned long addr = (unsigned long)cpu_buffer->meta_page;
+
+	virt_to_page(addr)->mapping = NULL;
+	free_page(addr);
+	cpu_buffer->meta_page = NULL;
+}
+
+static void rb_setup_ids_meta_page(struct ring_buffer_per_cpu *cpu_buffer,
+				   unsigned long *page_ids)
+{
+	struct ring_buffer_meta *meta = cpu_buffer->meta_page;
+	unsigned int nr_data_pages = cpu_buffer->nr_pages + 1;
+	struct buffer_page *first_page, *bpage;
+	int id = 0;
+
+	page_ids[id] = (unsigned long)cpu_buffer->reader_page->page;
+	cpu_buffer->reader_page->id = id++;
+
+	first_page = bpage = rb_set_head_page(cpu_buffer);
+	do {
+		if (id >= nr_data_pages) {
+			WARN_ON(1);
+			break;
+		}
+
+		page_ids[id] = (unsigned long)bpage->page;
+		bpage->id = id;
+
+		rb_inc_page(&bpage);
+		id++;
+	} while (bpage != first_page);
+
+	/* install page ID to kern VA translation */
+	cpu_buffer->page_ids = page_ids;
+
+	meta->meta_page_size = PAGE_SIZE;
+	meta->nr_data_pages = nr_data_pages;
+	meta->reader_page.id = cpu_buffer->reader_page->id;
+	rb_reset_meta_page(cpu_buffer);
+}
+
+static inline struct ring_buffer_per_cpu *
+rb_get_mapped_buffer(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+
+	if (!cpumask_test_cpu(cpu, buffer->cpumask))
+		return ERR_PTR(-EINVAL);
+
+	cpu_buffer = buffer->buffers[cpu];
+
+	mutex_lock(&cpu_buffer->mapping_lock);
+
+	if (!cpu_buffer->mapped) {
+		mutex_unlock(&cpu_buffer->mapping_lock);
+		return ERR_PTR(-ENODEV);
+	}
+
+	return cpu_buffer;
+}
+
+static inline void rb_put_mapped_buffer(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	mutex_unlock(&cpu_buffer->mapping_lock);
+}
+
+int ring_buffer_map(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	unsigned long flags, *page_ids;
+	int err = 0;
+
+	if (!cpumask_test_cpu(cpu, buffer->cpumask))
+		return -EINVAL;
+
+	cpu_buffer = buffer->buffers[cpu];
+
+	mutex_lock(&cpu_buffer->mapping_lock);
+
+	if (cpu_buffer->mapped) {
+		WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped + 1);
+		goto unlock;
+	}
+
+	/* prevent another thread from changing buffer sizes */
+	mutex_lock(&buffer->mutex);
+	atomic_inc(&cpu_buffer->resize_disabled);
+	mutex_unlock(&buffer->mutex);
+
+	err = rb_alloc_meta_page(cpu_buffer);
+	if (err) {
+		atomic_dec(&cpu_buffer->resize_disabled);
+		goto unlock;
+	}
+
+	/* page_ids include the reader page while nr_pages does not */
+	page_ids = kzalloc(sizeof(*page_ids) * (cpu_buffer->nr_pages + 1),
+			   GFP_KERNEL);
+	if (!page_ids) {
+		rb_free_meta_page(cpu_buffer);
+		atomic_dec(&cpu_buffer->resize_disabled);
+		err = -ENOMEM;
+		goto unlock;
+	}
+
+	/*
+	 * Lock all readers to block any page swap until the page IDs are
+	 * assigned.
+	 */
+	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+
+	rb_setup_ids_meta_page(cpu_buffer, page_ids);
+	/*
+	 * Ensure the writer will observe the meta-page before
+	 * cpu_buffer->mapped.
+	 */
+	smp_wmb();
+	WRITE_ONCE(cpu_buffer->mapped, 1);
+
+	/* Init meta_page values unless the writer did it already */
+	cmpxchg(&cpu_buffer->meta_page->entries, 0,
+		local_read(&cpu_buffer->entries));
+	cmpxchg(&cpu_buffer->meta_page->overrun, 0,
+		local_read(&cpu_buffer->overrun));
+	cmpxchg(&cpu_buffer->meta_page->pages_touched, 0,
+		local_read(&cpu_buffer->pages_touched));
+	cmpxchg(&cpu_buffer->meta_page->pages_lost, 0,
+		local_read(&cpu_buffer->pages_lost));
+
+	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+unlock:
+	mutex_unlock(&cpu_buffer->mapping_lock);
+
+	return err;
+}
+
+int ring_buffer_unmap(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	int err = 0;
+
+	if (!cpumask_test_cpu(cpu, buffer->cpumask))
+		return -EINVAL;
+
+	cpu_buffer = buffer->buffers[cpu];
+
+	mutex_lock(&cpu_buffer->mapping_lock);
+
+	if (!cpu_buffer->mapped) {
+		err = -ENODEV;
+		goto unlock;
+	}
+
+	WRITE_ONCE(cpu_buffer->mapped, cpu_buffer->mapped - 1);
+	if (!cpu_buffer->mapped) {
+		/* Wait the writer and readers to observe !mapped */
+		synchronize_rcu();
+
+		rb_free_page_ids(cpu_buffer);
+		rb_free_meta_page(cpu_buffer);
+		atomic_dec(&cpu_buffer->resize_disabled);
+	}
+
+unlock:
+	mutex_unlock(&cpu_buffer->mapping_lock);
+
+	return err;
+}
+
+/*
+ *   +--------------+
+ *   |   meta page  |  pgoff=0
+ *   +--------------+
+ *   |  data page1  |  page_ids=0
+ *   +--------------+
+ *   |  data page2  |  page_ids=1
+ *         ...
+ */
+struct page *ring_buffer_map_fault(struct trace_buffer *buffer, int cpu,
+				   unsigned long pgoff)
+{
+	struct ring_buffer_per_cpu *cpu_buffer = buffer->buffers[cpu];
+
+	if (!pgoff)
+		return virt_to_page((void *)cpu_buffer->meta_page);
+
+	pgoff--;
+	if (pgoff > cpu_buffer->nr_pages)
+		return NULL;
+
+	return virt_to_page(cpu_buffer->page_ids[pgoff]);
+}
+
+int ring_buffer_map_get_reader_page(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	unsigned long reader_size, flags;
+
+	cpu_buffer = rb_get_mapped_buffer(buffer, cpu);
+	if (IS_ERR(cpu_buffer))
+		return (int)PTR_ERR(cpu_buffer);
+
+	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+consume:
+	if (rb_per_cpu_empty(cpu_buffer))
+		goto out;
+	reader_size = rb_page_size(cpu_buffer->reader_page);
+	if (cpu_buffer->reader_page->read < reader_size) {
+		while (cpu_buffer->reader_page->read < reader_size)
+			rb_advance_reader(cpu_buffer);
+		goto out;
+	}
+
+	if (WARN_ON(!rb_get_reader_page(cpu_buffer)))
+		goto out;
+
+	goto consume;
+out:
+	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+	rb_put_mapped_buffer(cpu_buffer);
+
+	return 0;
+}
+
 /*
  * We only allocate new buffers, never free them if the CPU goes down.
  * If we were to free the buffer, then the user would lose any trace that was in