diff mbox series

[02/13] ring-buffer: Introducing ring-buffer writer

Message ID 20240911093029.3279154-3-vdonnefort@google.com (mailing list archive)
State New
Delegated to: Steven Rostedt
Headers show
Series Tracefs support for pKVM | expand

Commit Message

Vincent Donnefort Sept. 11, 2024, 9:30 a.m. UTC
A ring-buffer writer is an entity outside of the kernel (most likely a
firmware or a hypervisor) capable of writing events in a ring-buffer
following the same format as the tracefs ring-buffer.

To setup the ring-buffer on the kernel side, a description of the pages
(struct trace_page_desc) is necessary. A callback (get_reader_page) must
also be provided. It is called whenever it is done reading the previous
reader page.

It is expected from the writer to keep the meta-page updated.

Signed-off-by: Vincent Donnefort <vdonnefort@google.com>

Comments

kernel test robot Sept. 12, 2024, 12:55 p.m. UTC | #1
Hi Vincent,

kernel test robot noticed the following build warnings:

[auto build test WARNING on 8d8d276ba2fb5f9ac4984f5c10ae60858090babc]

url:    https://github.com/intel-lab-lkp/linux/commits/Vincent-Donnefort/ring-buffer-Check-for-empty-ring-buffer-with-rb_num_of_entries/20240911-173526
base:   8d8d276ba2fb5f9ac4984f5c10ae60858090babc
patch link:    https://lore.kernel.org/r/20240911093029.3279154-3-vdonnefort%40google.com
patch subject: [PATCH 02/13] ring-buffer: Introducing ring-buffer writer
config: x86_64-buildonly-randconfig-005-20240912 (https://download.01.org/0day-ci/archive/20240912/202409122021.A3dHcn9b-lkp@intel.com/config)
compiler: gcc-12 (Debian 12.2.0-14) 12.2.0
reproduce (this is a W=1 build): (https://download.01.org/0day-ci/archive/20240912/202409122021.A3dHcn9b-lkp@intel.com/reproduce)

If you fix the issue in a separate patch/commit (i.e. not just a new version of
the same patch/commit), kindly add following tags
| Reported-by: kernel test robot <lkp@intel.com>
| Closes: https://lore.kernel.org/oe-kbuild-all/202409122021.A3dHcn9b-lkp@intel.com/

All warnings (new ones prefixed by >>):

>> kernel/trace/ring_buffer.c:1755: warning: Function parameter or struct member 'writer' not described in '__ring_buffer_alloc'


vim +1755 kernel/trace/ring_buffer.c

7a8e76a3829f10 Steven Rostedt              2008-09-29  1740  
7a8e76a3829f10 Steven Rostedt              2008-09-29  1741  /**
d611851b421731 zhangwei(Jovi               2013-07-15  1742)  * __ring_buffer_alloc - allocate a new ring_buffer
68814b58c52077 Robert Richter              2008-11-24  1743   * @size: the size in bytes per cpu that is needed.
7a8e76a3829f10 Steven Rostedt              2008-09-29  1744   * @flags: attributes to set for the ring buffer.
59e7cffe5cca6f Fabian Frederick            2014-06-05  1745   * @key: ring buffer reader_lock_key.
7a8e76a3829f10 Steven Rostedt              2008-09-29  1746   *
7a8e76a3829f10 Steven Rostedt              2008-09-29  1747   * Currently the only flag that is available is the RB_FL_OVERWRITE
7a8e76a3829f10 Steven Rostedt              2008-09-29  1748   * flag. This flag means that the buffer will overwrite old data
7a8e76a3829f10 Steven Rostedt              2008-09-29  1749   * when the buffer wraps. If this flag is not set, the buffer will
7a8e76a3829f10 Steven Rostedt              2008-09-29  1750   * drop data when the tail hits the head.
7a8e76a3829f10 Steven Rostedt              2008-09-29  1751   */
13292494379f92 Steven Rostedt (VMware      2019-12-13  1752) struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
5f3fc4f452e3d3 Vincent Donnefort           2024-09-11  1753  					struct lock_class_key *key,
5f3fc4f452e3d3 Vincent Donnefort           2024-09-11  1754  					struct ring_buffer_writer *writer)
7a8e76a3829f10 Steven Rostedt              2008-09-29 @1755  {
13292494379f92 Steven Rostedt (VMware      2019-12-13  1756) 	struct trace_buffer *buffer;
9b94a8fba501f3 Steven Rostedt (Red Hat     2016-05-12  1757) 	long nr_pages;
7a8e76a3829f10 Steven Rostedt              2008-09-29  1758  	int bsize;
9b94a8fba501f3 Steven Rostedt (Red Hat     2016-05-12  1759) 	int cpu;
b32614c03413f8 Sebastian Andrzej Siewior   2016-11-27  1760  	int ret;
7a8e76a3829f10 Steven Rostedt              2008-09-29  1761  
7a8e76a3829f10 Steven Rostedt              2008-09-29  1762  	/* keep it in its own cache line */
7a8e76a3829f10 Steven Rostedt              2008-09-29  1763  	buffer = kzalloc(ALIGN(sizeof(*buffer), cache_line_size()),
7a8e76a3829f10 Steven Rostedt              2008-09-29  1764  			 GFP_KERNEL);
7a8e76a3829f10 Steven Rostedt              2008-09-29  1765  	if (!buffer)
7a8e76a3829f10 Steven Rostedt              2008-09-29  1766  		return NULL;
7a8e76a3829f10 Steven Rostedt              2008-09-29  1767  
b18cc3de00ec34 Sebastian Andrzej Siewior   2016-12-07  1768  	if (!zalloc_cpumask_var(&buffer->cpumask, GFP_KERNEL))
9e01c1b74c9531 Rusty Russell               2009-01-01  1769  		goto fail_free_buffer;
9e01c1b74c9531 Rusty Russell               2009-01-01  1770  
139f84002145d8 Tzvetomir Stoyanov (VMware  2023-12-19  1771) 	/* Default buffer page size - one system page */
f9b94daa542a8d Tzvetomir Stoyanov (VMware  2023-12-19  1772) 	buffer->subbuf_order = 0;
139f84002145d8 Tzvetomir Stoyanov (VMware  2023-12-19  1773) 	buffer->subbuf_size = PAGE_SIZE - BUF_PAGE_HDR_SIZE;
139f84002145d8 Tzvetomir Stoyanov (VMware  2023-12-19  1774) 
139f84002145d8 Tzvetomir Stoyanov (VMware  2023-12-19  1775) 	/* Max payload is buffer page size - header (8bytes) */
139f84002145d8 Tzvetomir Stoyanov (VMware  2023-12-19  1776) 	buffer->max_data_size = buffer->subbuf_size - (sizeof(u32) * 2);
139f84002145d8 Tzvetomir Stoyanov (VMware  2023-12-19  1777) 
139f84002145d8 Tzvetomir Stoyanov (VMware  2023-12-19  1778) 	nr_pages = DIV_ROUND_UP(size, buffer->subbuf_size);
7a8e76a3829f10 Steven Rostedt              2008-09-29  1779  	buffer->flags = flags;
37886f6a9f62d2 Steven Rostedt              2009-03-17  1780  	buffer->clock = trace_clock_local;
1f8a6a10fb9437 Peter Zijlstra              2009-06-08  1781  	buffer->reader_lock_key = key;
5f3fc4f452e3d3 Vincent Donnefort           2024-09-11  1782  	if (writer) {
5f3fc4f452e3d3 Vincent Donnefort           2024-09-11  1783  		buffer->writer = writer;
5f3fc4f452e3d3 Vincent Donnefort           2024-09-11  1784  		/* The writer is external and never done by the kernel */
5f3fc4f452e3d3 Vincent Donnefort           2024-09-11  1785  		atomic_inc(&buffer->record_disabled);
5f3fc4f452e3d3 Vincent Donnefort           2024-09-11  1786  	}
7a8e76a3829f10 Steven Rostedt              2008-09-29  1787  
15693458c4bc06 Steven Rostedt (Red Hat     2013-02-28  1788) 	init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
f1dc6725882b5c Steven Rostedt (Red Hat     2013-03-04  1789) 	init_waitqueue_head(&buffer->irq_work.waiters);
15693458c4bc06 Steven Rostedt (Red Hat     2013-02-28  1790) 
7a8e76a3829f10 Steven Rostedt              2008-09-29  1791  	/* need at least two pages */
438ced1720b584 Vaibhav Nagarnaik           2012-02-02  1792  	if (nr_pages < 2)
438ced1720b584 Vaibhav Nagarnaik           2012-02-02  1793  		nr_pages = 2;
7a8e76a3829f10 Steven Rostedt              2008-09-29  1794  
7a8e76a3829f10 Steven Rostedt              2008-09-29  1795  	buffer->cpus = nr_cpu_ids;
7a8e76a3829f10 Steven Rostedt              2008-09-29  1796  
7a8e76a3829f10 Steven Rostedt              2008-09-29  1797  	bsize = sizeof(void *) * nr_cpu_ids;
7a8e76a3829f10 Steven Rostedt              2008-09-29  1798  	buffer->buffers = kzalloc(ALIGN(bsize, cache_line_size()),
7a8e76a3829f10 Steven Rostedt              2008-09-29  1799  				  GFP_KERNEL);
7a8e76a3829f10 Steven Rostedt              2008-09-29  1800  	if (!buffer->buffers)
9e01c1b74c9531 Rusty Russell               2009-01-01  1801  		goto fail_free_cpumask;
7a8e76a3829f10 Steven Rostedt              2008-09-29  1802  
b32614c03413f8 Sebastian Andrzej Siewior   2016-11-27  1803  	cpu = raw_smp_processor_id();
b32614c03413f8 Sebastian Andrzej Siewior   2016-11-27  1804  	cpumask_set_cpu(cpu, buffer->cpumask);
b32614c03413f8 Sebastian Andrzej Siewior   2016-11-27  1805  	buffer->buffers[cpu] = rb_allocate_cpu_buffer(buffer, nr_pages, cpu);
7a8e76a3829f10 Steven Rostedt              2008-09-29  1806  	if (!buffer->buffers[cpu])
7a8e76a3829f10 Steven Rostedt              2008-09-29  1807  		goto fail_free_buffers;
7a8e76a3829f10 Steven Rostedt              2008-09-29  1808  
b32614c03413f8 Sebastian Andrzej Siewior   2016-11-27  1809  	ret = cpuhp_state_add_instance(CPUHP_TRACE_RB_PREPARE, &buffer->node);
b32614c03413f8 Sebastian Andrzej Siewior   2016-11-27  1810  	if (ret < 0)
b32614c03413f8 Sebastian Andrzej Siewior   2016-11-27  1811  		goto fail_free_buffers;
554f786e284a6c Steven Rostedt              2009-03-11  1812  
7a8e76a3829f10 Steven Rostedt              2008-09-29  1813  	mutex_init(&buffer->mutex);
7a8e76a3829f10 Steven Rostedt              2008-09-29  1814  
7a8e76a3829f10 Steven Rostedt              2008-09-29  1815  	return buffer;
7a8e76a3829f10 Steven Rostedt              2008-09-29  1816  
7a8e76a3829f10 Steven Rostedt              2008-09-29  1817   fail_free_buffers:
7a8e76a3829f10 Steven Rostedt              2008-09-29  1818  	for_each_buffer_cpu(buffer, cpu) {
7a8e76a3829f10 Steven Rostedt              2008-09-29  1819  		if (buffer->buffers[cpu])
7a8e76a3829f10 Steven Rostedt              2008-09-29  1820  			rb_free_cpu_buffer(buffer->buffers[cpu]);
7a8e76a3829f10 Steven Rostedt              2008-09-29  1821  	}
7a8e76a3829f10 Steven Rostedt              2008-09-29  1822  	kfree(buffer->buffers);
7a8e76a3829f10 Steven Rostedt              2008-09-29  1823  
9e01c1b74c9531 Rusty Russell               2009-01-01  1824   fail_free_cpumask:
9e01c1b74c9531 Rusty Russell               2009-01-01  1825  	free_cpumask_var(buffer->cpumask);
9e01c1b74c9531 Rusty Russell               2009-01-01  1826  
7a8e76a3829f10 Steven Rostedt              2008-09-29  1827   fail_free_buffer:
7a8e76a3829f10 Steven Rostedt              2008-09-29  1828  	kfree(buffer);
7a8e76a3829f10 Steven Rostedt              2008-09-29  1829  	return NULL;
7a8e76a3829f10 Steven Rostedt              2008-09-29  1830  }
1f8a6a10fb9437 Peter Zijlstra              2009-06-08  1831  EXPORT_SYMBOL_GPL(__ring_buffer_alloc);
7a8e76a3829f10 Steven Rostedt              2008-09-29  1832
diff mbox series

Patch

diff --git a/include/linux/ring_buffer.h b/include/linux/ring_buffer.h
index fd35d4ec12e1..d78a33b3c96e 100644
--- a/include/linux/ring_buffer.h
+++ b/include/linux/ring_buffer.h
@@ -83,21 +83,24 @@  u64 ring_buffer_event_time_stamp(struct trace_buffer *buffer,
 void ring_buffer_discard_commit(struct trace_buffer *buffer,
 				struct ring_buffer_event *event);
 
+struct ring_buffer_writer;
+
 /*
  * size is in bytes for each per CPU buffer.
  */
 struct trace_buffer *
-__ring_buffer_alloc(unsigned long size, unsigned flags, struct lock_class_key *key);
+__ring_buffer_alloc(unsigned long size, unsigned flags, struct lock_class_key *key,
+		    struct ring_buffer_writer *writer);
 
 /*
  * Because the ring buffer is generic, if other users of the ring buffer get
  * traced by ftrace, it can produce lockdep warnings. We need to keep each
  * ring buffer's lock class separate.
  */
-#define ring_buffer_alloc(size, flags)			\
-({							\
-	static struct lock_class_key __key;		\
-	__ring_buffer_alloc((size), (flags), &__key);	\
+#define ring_buffer_alloc(size, flags)				\
+({								\
+	static struct lock_class_key __key;			\
+	__ring_buffer_alloc((size), (flags), &__key, NULL);	\
 })
 
 typedef bool (*ring_buffer_cond_fn)(void *data);
@@ -228,4 +231,54 @@  int ring_buffer_map(struct trace_buffer *buffer, int cpu,
 		    struct vm_area_struct *vma);
 int ring_buffer_unmap(struct trace_buffer *buffer, int cpu);
 int ring_buffer_map_get_reader(struct trace_buffer *buffer, int cpu);
+
+#define meta_pages_lost(__meta) \
+	((__meta)->Reserved1)
+#define meta_pages_touched(__meta) \
+	((__meta)->Reserved2)
+
+struct rb_page_desc {
+	int		cpu;
+	int		nr_page_va; /* exclude the meta page */
+	unsigned long	meta_va;
+	unsigned long	page_va[];
+};
+
+struct trace_page_desc {
+	int		nr_cpus;
+	char		__data[]; /* list of rb_page_desc */
+};
+
+static inline
+struct rb_page_desc *__next_rb_page_desc(struct rb_page_desc *pdesc)
+{
+	size_t len = struct_size(pdesc, page_va, pdesc->nr_page_va);
+
+	return (struct rb_page_desc *)((void *)pdesc + len);
+}
+
+static inline
+struct rb_page_desc *__first_rb_page_desc(struct trace_page_desc *trace_pdesc)
+{
+	return (struct rb_page_desc *)(&trace_pdesc->__data[0]);
+}
+
+#define for_each_rb_page_desc(__pdesc, __cpu, __trace_pdesc)		\
+	for (__pdesc = __first_rb_page_desc(__trace_pdesc), __cpu = 0;	\
+	     __cpu < (__trace_pdesc)->nr_cpus;				\
+	     __cpu++, __pdesc = __next_rb_page_desc(__pdesc))
+
+struct ring_buffer_writer {
+	struct trace_page_desc	*pdesc;
+	int			(*get_reader_page)(int cpu);
+	int			(*reset)(int cpu);
+};
+
+int ring_buffer_poll_writer(struct trace_buffer *buffer, int cpu);
+
+#define ring_buffer_reader(writer)				\
+({								\
+	static struct lock_class_key __key;			\
+	__ring_buffer_alloc(0, RB_FL_OVERWRITE, &__key, writer);\
+})
 #endif /* _LINUX_RING_BUFFER_H */
diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 7abe671effbf..b05b7a95e3f1 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -495,6 +495,8 @@  struct ring_buffer_per_cpu {
 	unsigned long			*subbuf_ids;	/* ID to subbuf VA */
 	struct trace_buffer_meta	*meta_page;
 
+	struct ring_buffer_writer	*writer;
+
 	/* ring buffer pages to update, > 0 to add, < 0 to remove */
 	long				nr_pages_to_update;
 	struct list_head		new_pages; /* new pages to add */
@@ -517,6 +519,8 @@  struct trace_buffer {
 
 	struct ring_buffer_per_cpu	**buffers;
 
+	struct ring_buffer_writer	*writer;
+
 	struct hlist_node		node;
 	u64				(*clock)(void);
 
@@ -1555,6 +1559,42 @@  static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
 	return -ENOMEM;
 }
 
+static struct rb_page_desc *rb_page_desc(struct trace_page_desc *trace_pdesc,
+					 int cpu)
+{
+	struct rb_page_desc *pdesc;
+	size_t len;
+	int i;
+
+	if (!trace_pdesc)
+		return NULL;
+
+	if (cpu >= trace_pdesc->nr_cpus)
+		return NULL;
+
+	pdesc = __first_rb_page_desc(trace_pdesc);
+	len = struct_size(pdesc, page_va, pdesc->nr_page_va);
+	pdesc += len * cpu;
+
+	if (pdesc->cpu == cpu)
+		return pdesc;
+
+	/* Missing CPUs, need to linear search */
+
+	for_each_rb_page_desc(pdesc, i, trace_pdesc) {
+		if (pdesc->cpu == cpu)
+			return pdesc;
+	}
+
+	return NULL;
+}
+
+static void *rb_page_desc_page(struct rb_page_desc *pdesc, int page_id)
+{
+	return page_id > pdesc->nr_page_va ? NULL : (void *)pdesc->page_va[page_id];
+}
+
+
 static int rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
 			     unsigned long nr_pages)
 {
@@ -1614,6 +1654,31 @@  rb_allocate_cpu_buffer(struct trace_buffer *buffer, long nr_pages, int cpu)
 
 	cpu_buffer->reader_page = bpage;
 
+	if (buffer->writer) {
+		struct rb_page_desc *pdesc = rb_page_desc(buffer->writer->pdesc, cpu);
+
+		if (!pdesc)
+			goto fail_free_reader;
+
+		cpu_buffer->writer = buffer->writer;
+		cpu_buffer->meta_page = (struct trace_buffer_meta *)(void *)pdesc->meta_va;
+		cpu_buffer->subbuf_ids = pdesc->page_va;
+		cpu_buffer->nr_pages = pdesc->nr_page_va - 1;
+		atomic_inc(&cpu_buffer->record_disabled);
+		atomic_inc(&cpu_buffer->resize_disabled);
+
+		bpage->page = rb_page_desc_page(pdesc,
+						cpu_buffer->meta_page->reader.id);
+		if (!bpage->page)
+			goto fail_free_reader;
+		/*
+		 * The meta-page can only describe which of the ring-buffer page
+		 * is the reader. There is no need to init the rest of the
+		 * ring-buffer.
+		 */
+		return cpu_buffer;
+	}
+
 	page = alloc_pages_node(cpu_to_node(cpu), GFP_KERNEL | __GFP_COMP | __GFP_ZERO,
 				cpu_buffer->buffer->subbuf_order);
 	if (!page)
@@ -1651,6 +1716,10 @@  static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
 
 	irq_work_sync(&cpu_buffer->irq_work.work);
 
+	/* ring_buffers with writer set do not own the data pages */
+	if (cpu_buffer->writer)
+		cpu_buffer->reader_page->page = NULL;
+
 	free_buffer_page(cpu_buffer->reader_page);
 
 	if (head) {
@@ -1681,7 +1750,8 @@  static void rb_free_cpu_buffer(struct ring_buffer_per_cpu *cpu_buffer)
  * drop data when the tail hits the head.
  */
 struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
-					struct lock_class_key *key)
+					struct lock_class_key *key,
+					struct ring_buffer_writer *writer)
 {
 	struct trace_buffer *buffer;
 	long nr_pages;
@@ -1709,6 +1779,11 @@  struct trace_buffer *__ring_buffer_alloc(unsigned long size, unsigned flags,
 	buffer->flags = flags;
 	buffer->clock = trace_clock_local;
 	buffer->reader_lock_key = key;
+	if (writer) {
+		buffer->writer = writer;
+		/* The writer is external and never done by the kernel */
+		atomic_inc(&buffer->record_disabled);
+	}
 
 	init_irq_work(&buffer->irq_work.work, rb_wake_up_waiters);
 	init_waitqueue_head(&buffer->irq_work.waiters);
@@ -4456,8 +4531,54 @@  rb_update_iter_read_stamp(struct ring_buffer_iter *iter,
 	}
 }
 
+static bool rb_read_writer_meta_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	local_set(&cpu_buffer->entries, READ_ONCE(cpu_buffer->meta_page->entries));
+	local_set(&cpu_buffer->overrun, READ_ONCE(cpu_buffer->meta_page->overrun));
+	local_set(&cpu_buffer->pages_touched, READ_ONCE(meta_pages_touched(cpu_buffer->meta_page)));
+	local_set(&cpu_buffer->pages_lost, READ_ONCE(meta_pages_lost(cpu_buffer->meta_page)));
+	/*
+	 * No need to get the "read" field, it can be tracked here as any
+	 * reader will have to go through a rign_buffer_per_cpu.
+	 */
+
+	return rb_num_of_entries(cpu_buffer);
+}
+
 static struct buffer_page *
-rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
+__rb_get_reader_page_from_writer(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	u32 prev_reader;
+
+	if (!rb_read_writer_meta_page(cpu_buffer))
+		return NULL;
+
+	/* More to read on the reader page */
+	if (cpu_buffer->reader_page->read < rb_page_size(cpu_buffer->reader_page))
+		return cpu_buffer->reader_page;
+
+	prev_reader = cpu_buffer->meta_page->reader.id;
+
+	WARN_ON(cpu_buffer->writer->get_reader_page(cpu_buffer->cpu));
+	/* nr_pages doesn't include the reader page */
+	if (cpu_buffer->meta_page->reader.id > cpu_buffer->nr_pages) {
+		WARN_ON(1);
+		return NULL;
+	}
+
+	cpu_buffer->reader_page->page =
+		(void *)cpu_buffer->subbuf_ids[cpu_buffer->meta_page->reader.id];
+	cpu_buffer->reader_page->read = 0;
+	cpu_buffer->read_stamp = cpu_buffer->reader_page->page->time_stamp;
+	cpu_buffer->lost_events = cpu_buffer->meta_page->reader.lost_events;
+
+	WARN_ON(prev_reader == cpu_buffer->meta_page->reader.id);
+
+	return cpu_buffer->reader_page;
+}
+
+static struct buffer_page *
+__rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
 {
 	struct buffer_page *reader = NULL;
 	unsigned long bsize = READ_ONCE(cpu_buffer->buffer->subbuf_size);
@@ -4624,6 +4745,13 @@  rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
 	return reader;
 }
 
+static struct buffer_page *
+rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
+{
+	return cpu_buffer->writer ? __rb_get_reader_page_from_writer(cpu_buffer) :
+				    __rb_get_reader_page(cpu_buffer);
+}
+
 static void rb_advance_reader(struct ring_buffer_per_cpu *cpu_buffer)
 {
 	struct ring_buffer_event *event;
@@ -5028,7 +5156,7 @@  ring_buffer_read_prepare(struct trace_buffer *buffer, int cpu, gfp_t flags)
 	struct ring_buffer_per_cpu *cpu_buffer;
 	struct ring_buffer_iter *iter;
 
-	if (!cpumask_test_cpu(cpu, buffer->cpumask))
+	if (!cpumask_test_cpu(cpu, buffer->cpumask) || buffer->writer)
 		return NULL;
 
 	iter = kzalloc(sizeof(*iter), flags);
@@ -5198,6 +5326,22 @@  rb_reset_cpu(struct ring_buffer_per_cpu *cpu_buffer)
 {
 	struct buffer_page *page;
 
+	if (cpu_buffer->writer) {
+		if (!cpu_buffer->writer->reset)
+			return;
+
+		cpu_buffer->writer->reset(cpu_buffer->cpu);
+		rb_read_writer_meta_page(cpu_buffer);
+
+		/* Read related values, not covered by the meta-page */
+		local_set(&cpu_buffer->pages_read, 0);
+		cpu_buffer->read = 0;
+		cpu_buffer->read_bytes = 0;
+		cpu_buffer->last_overrun = 0;
+
+		return;
+	}
+
 	rb_head_page_deactivate(cpu_buffer);
 
 	cpu_buffer->head_page
@@ -5428,6 +5572,49 @@  bool ring_buffer_empty_cpu(struct trace_buffer *buffer, int cpu)
 }
 EXPORT_SYMBOL_GPL(ring_buffer_empty_cpu);
 
+int ring_buffer_poll_writer(struct trace_buffer *buffer, int cpu)
+{
+	struct ring_buffer_per_cpu *cpu_buffer;
+	unsigned long flags;
+
+	if (cpu != RING_BUFFER_ALL_CPUS) {
+		if (!cpumask_test_cpu(cpu, buffer->cpumask))
+			return -EINVAL;
+
+		cpu_buffer = buffer->buffers[cpu];
+
+		raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+		if (rb_read_writer_meta_page(cpu_buffer))
+			rb_wakeups(buffer, cpu_buffer);
+		raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+
+		return 0;
+	}
+
+	/*
+	 * Make sure all the ring buffers are up to date before we start reading
+	 * them.
+	 */
+	for_each_buffer_cpu(buffer, cpu) {
+		cpu_buffer = buffer->buffers[cpu];
+
+		raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+		rb_read_writer_meta_page(buffer->buffers[cpu]);
+		raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+	}
+
+	for_each_buffer_cpu(buffer, cpu) {
+		cpu_buffer = buffer->buffers[cpu];
+
+		raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+		if (rb_num_of_entries(cpu_buffer))
+			rb_wakeups(buffer, buffer->buffers[cpu]);
+		raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+	}
+
+	return 0;
+}
+
 #ifdef CONFIG_RING_BUFFER_ALLOW_SWAP
 /**
  * ring_buffer_swap_cpu - swap a CPU buffer between two ring buffers
@@ -5679,6 +5866,7 @@  int ring_buffer_read_page(struct trace_buffer *buffer,
 	unsigned int commit;
 	unsigned int read;
 	u64 save_timestamp;
+	bool force_memcpy;
 	int ret = -1;
 
 	if (!cpumask_test_cpu(cpu, buffer->cpumask))
@@ -5716,6 +5904,8 @@  int ring_buffer_read_page(struct trace_buffer *buffer,
 	/* Check if any events were dropped */
 	missed_events = cpu_buffer->lost_events;
 
+	force_memcpy = cpu_buffer->mapped || cpu_buffer->writer;
+
 	/*
 	 * If this page has been partially read or
 	 * if len is not big enough to read the rest of the page or
@@ -5725,7 +5915,7 @@  int ring_buffer_read_page(struct trace_buffer *buffer,
 	 */
 	if (read || (len < (commit - read)) ||
 	    cpu_buffer->reader_page == cpu_buffer->commit_page ||
-	    cpu_buffer->mapped) {
+	    force_memcpy) {
 		struct buffer_data_page *rpage = cpu_buffer->reader_page->page;
 		unsigned int rpos = read;
 		unsigned int pos = 0;
@@ -6278,7 +6468,7 @@  int ring_buffer_map(struct trace_buffer *buffer, int cpu,
 	unsigned long flags, *subbuf_ids;
 	int err = 0;
 
-	if (!cpumask_test_cpu(cpu, buffer->cpumask))
+	if (!cpumask_test_cpu(cpu, buffer->cpumask) || buffer->writer)
 		return -EINVAL;
 
 	cpu_buffer = buffer->buffers[cpu];