diff mbox series

[v2] ring-buffer: Limit time with disabled interrupts in rb_check_pages()

Message ID 20240703075314.23511-1-petr.pavlu@suse.com (mailing list archive)
State Superseded
Headers show
Series [v2] ring-buffer: Limit time with disabled interrupts in rb_check_pages() | expand

Commit Message

Petr Pavlu July 3, 2024, 7:53 a.m. UTC
The function rb_check_pages() validates the integrity of a specified
per-CPU tracing ring buffer. It does so by traversing the underlying
linked list and checking its next and prev links.

To guarantee that the list isn't modified during the check, a caller
typically needs to take cpu_buffer->reader_lock. This prevents the check
from running concurrently, for example, with a potential reader which
can make the list temporarily inconsistent when swapping its old reader
page into the buffer.

A problem with this approach is that the time when interrupts are
disabled is non-deterministic, dependent on the ring buffer size. This
particularly affects PREEMPT_RT because the reader_lock is a raw
spinlock which doesn't become sleepable on PREEMPT_RT kernels.

Modify the check so it still attempts to traverse the entire list, but
gives up the reader_lock between checking individual pages. Introduce
for this purpose a new variable ring_buffer_per_cpu.pages_era which is
bumped any time the list is modified. The value is used by
rb_check_pages() to detect such a change and restart the check.

Signed-off-by: Petr Pavlu <petr.pavlu@suse.com>
---

This is a follow-up to the discussion about improving this check [1].

Changes since v1 [2]:
* Correct the case when rb_check_pages() is invoked concurrently with
  the resize code which modifies the list. Introduce
  ring_buffer_per_cpu.pages_era for this purpose.

[1] https://lore.kernel.org/linux-trace-kernel/20240517134008.24529-1-petr.pavlu@suse.com/
[2] https://lore.kernel.org/linux-trace-kernel/20240621150956.24814-1-petr.pavlu@suse.com/

 kernel/trace/ring_buffer.c | 97 ++++++++++++++++++++++++++++----------
 1 file changed, 71 insertions(+), 26 deletions(-)


base-commit: 1dfe225e9af5bd3399a1dbc6a4df6a6041ff9c23

Comments

Steven Rostedt July 3, 2024, 9:44 p.m. UTC | #1
On Wed,  3 Jul 2024 09:53:14 +0200
Petr Pavlu <petr.pavlu@suse.com> wrote:

> The function rb_check_pages() validates the integrity of a specified
> per-CPU tracing ring buffer. It does so by traversing the underlying
> linked list and checking its next and prev links.
> 
> To guarantee that the list isn't modified during the check, a caller
> typically needs to take cpu_buffer->reader_lock. This prevents the check
> from running concurrently, for example, with a potential reader which
> can make the list temporarily inconsistent when swapping its old reader
> page into the buffer.
> 
> A problem with this approach is that the time when interrupts are
> disabled is non-deterministic, dependent on the ring buffer size. This
> particularly affects PREEMPT_RT because the reader_lock is a raw
> spinlock which doesn't become sleepable on PREEMPT_RT kernels.
> 
> Modify the check so it still attempts to traverse the entire list, but
> gives up the reader_lock between checking individual pages. Introduce
> for this purpose a new variable ring_buffer_per_cpu.pages_era which is

I'm dumb. What's an "era"?

-- Steve

> bumped any time the list is modified. The value is used by
> rb_check_pages() to detect such a change and restart the check.
> 
> Signed-off-by: Petr Pavlu <petr.pavlu@suse.com>
Petr Pavlu July 4, 2024, 11:03 a.m. UTC | #2
On 7/3/24 23:44, Steven Rostedt wrote:
> On Wed,  3 Jul 2024 09:53:14 +0200
> Petr Pavlu <petr.pavlu@suse.com> wrote:
> 
>> The function rb_check_pages() validates the integrity of a specified
>> per-CPU tracing ring buffer. It does so by traversing the underlying
>> linked list and checking its next and prev links.
>>
>> To guarantee that the list isn't modified during the check, a caller
>> typically needs to take cpu_buffer->reader_lock. This prevents the check
>> from running concurrently, for example, with a potential reader which
>> can make the list temporarily inconsistent when swapping its old reader
>> page into the buffer.
>>
>> A problem with this approach is that the time when interrupts are
>> disabled is non-deterministic, dependent on the ring buffer size. This
>> particularly affects PREEMPT_RT because the reader_lock is a raw
>> spinlock which doesn't become sleepable on PREEMPT_RT kernels.
>>
>> Modify the check so it still attempts to traverse the entire list, but
>> gives up the reader_lock between checking individual pages. Introduce
>> for this purpose a new variable ring_buffer_per_cpu.pages_era which is
> 
> I'm dumb. What's an "era"?

I meant it as a calendar era or epoch. The idea was to hint this is
a number that identifies some structural state of the pages list. Maybe
pages_gen ("generation") or another name would be better?

-- Petr
Steven Rostedt July 11, 2024, 2:10 p.m. UTC | #3
On Thu, 4 Jul 2024 13:03:47 +0200
Petr Pavlu <petr.pavlu@suse.com> wrote:

> > I'm dumb. What's an "era"?  
> 
> I meant it as a calendar era or epoch. The idea was to hint this is
> a number that identifies some structural state of the pages list. Maybe
> pages_gen ("generation") or another name would be better?

Ah, out of context I thought it was short for something. Perhaps just use
"cnt" with a comment, as that can be generic enough for what it is.

Thanks,

-- Steve
diff mbox series

Patch

diff --git a/kernel/trace/ring_buffer.c b/kernel/trace/ring_buffer.c
index 28853966aa9a..ff9a98c06043 100644
--- a/kernel/trace/ring_buffer.c
+++ b/kernel/trace/ring_buffer.c
@@ -462,6 +462,7 @@  struct ring_buffer_per_cpu {
 	unsigned long			nr_pages;
 	unsigned int			current_context;
 	struct list_head		*pages;
+	unsigned long			pages_era;	/* list state era */
 	struct buffer_page		*head_page;	/* read from head */
 	struct buffer_page		*tail_page;	/* write to tail */
 	struct buffer_page		*commit_page;	/* committed pages */
@@ -1454,40 +1455,87 @@  static void rb_check_bpage(struct ring_buffer_per_cpu *cpu_buffer,
 	RB_WARN_ON(cpu_buffer, val & RB_FLAG_MASK);
 }
 
+static bool rb_check_links(struct ring_buffer_per_cpu *cpu_buffer,
+			   struct list_head *list)
+{
+	if (RB_WARN_ON(cpu_buffer,
+		       rb_list_head(rb_list_head(list->next)->prev) != list))
+		return false;
+
+	if (RB_WARN_ON(cpu_buffer,
+		       rb_list_head(rb_list_head(list->prev)->next) != list))
+		return false;
+
+	return true;
+}
+
 /**
  * rb_check_pages - integrity check of buffer pages
  * @cpu_buffer: CPU buffer with pages to test
  *
  * As a safety measure we check to make sure the data pages have not
  * been corrupted.
- *
- * Callers of this function need to guarantee that the list of pages doesn't get
- * modified during the check. In particular, if it's possible that the function
- * is invoked with concurrent readers which can swap in a new reader page then
- * the caller should take cpu_buffer->reader_lock.
  */
 static void rb_check_pages(struct ring_buffer_per_cpu *cpu_buffer)
 {
-	struct list_head *head = rb_list_head(cpu_buffer->pages);
-	struct list_head *tmp;
+	struct list_head *head, *tmp;
+	unsigned long pages_era;
+	unsigned long flags;
+	int nr_loops = 0;
 
-	if (RB_WARN_ON(cpu_buffer,
-			rb_list_head(rb_list_head(head->next)->prev) != head))
+	/*
+	 * Walk the linked list underpinning the ring buffer and validate all
+	 * its next and prev links.
+	 *
+	 * The check acquires the reader_lock to avoid concurrent processing
+	 * with code that could be modifying the list. However, the lock cannot
+	 * be held for the entire duration of the walk, as this would make the
+	 * time when interrupts are disabled non-deterministic, dependent on the
+	 * ring buffer size. Therefore, the code releases and re-acquires the
+	 * lock after checking each page. The pages_era variable is then used to
+	 * detect if the list was modified while the lock was not held, in which
+	 * case the check needs to be restarted.
+	 *
+	 * The code attempts to perform the check at most three times before
+	 * giving up. This is acceptable because this is only a self-validation
+	 * to detect problems early on. In practice, the list modification
+	 * operations are fairly spaced, and so this check typically succeeds at
+	 * most on the second try.
+	 */
+again:
+	if (++nr_loops > 3)
 		return;
 
-	if (RB_WARN_ON(cpu_buffer,
-			rb_list_head(rb_list_head(head->prev)->next) != head))
-		return;
+	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
+	head = rb_list_head(cpu_buffer->pages);
+	if (!rb_check_links(cpu_buffer, head))
+		goto out_locked;
+	pages_era = cpu_buffer->pages_era;
+	tmp = head;
+	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
-	for (tmp = rb_list_head(head->next); tmp != head; tmp = rb_list_head(tmp->next)) {
-		if (RB_WARN_ON(cpu_buffer,
-				rb_list_head(rb_list_head(tmp->next)->prev) != tmp))
-			return;
+	while (true) {
+		raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 
-		if (RB_WARN_ON(cpu_buffer,
-				rb_list_head(rb_list_head(tmp->prev)->next) != tmp))
-			return;
+		if (pages_era != cpu_buffer->pages_era) {
+			/* The list was updated, try again. */
+			raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
+			goto again;
+		}
+
+		tmp = rb_list_head(tmp->next);
+		if (tmp == head)
+			/* The iteration circled back, all is done. */
+			goto out_locked;
+
+		if (!rb_check_links(cpu_buffer, tmp))
+			goto out_locked;
+
+		raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 	}
+
+out_locked:
+	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 }
 
 static int __rb_allocate_pages(struct ring_buffer_per_cpu *cpu_buffer,
@@ -1874,6 +1922,7 @@  rb_remove_pages(struct ring_buffer_per_cpu *cpu_buffer, unsigned long nr_pages)
 
 	/* make sure pages points to a valid page in the ring buffer */
 	cpu_buffer->pages = next_page;
+	cpu_buffer->pages_era++;
 
 	/* update head page */
 	if (head_bit)
@@ -1980,6 +2029,7 @@  rb_insert_pages(struct ring_buffer_per_cpu *cpu_buffer)
 			 * pointer to point to end of list
 			 */
 			head_page->prev = last_page;
+			cpu_buffer->pages_era++;
 			success = true;
 			break;
 		}
@@ -2215,12 +2265,8 @@  int ring_buffer_resize(struct trace_buffer *buffer, unsigned long size,
 		 */
 		synchronize_rcu();
 		for_each_buffer_cpu(buffer, cpu) {
-			unsigned long flags;
-
 			cpu_buffer = buffer->buffers[cpu];
-			raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 			rb_check_pages(cpu_buffer);
-			raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 		}
 		atomic_dec(&buffer->record_disabled);
 	}
@@ -4611,6 +4657,7 @@  rb_get_reader_page(struct ring_buffer_per_cpu *cpu_buffer)
 	rb_list_head(reader->list.next)->prev = &cpu_buffer->reader_page->list;
 	rb_inc_page(&cpu_buffer->head_page);
 
+	cpu_buffer->pages_era++;
 	local_inc(&cpu_buffer->pages_read);
 
 	/* Finally update the reader page to the new head */
@@ -5150,12 +5197,9 @@  void
 ring_buffer_read_finish(struct ring_buffer_iter *iter)
 {
 	struct ring_buffer_per_cpu *cpu_buffer = iter->cpu_buffer;
-	unsigned long flags;
 
 	/* Use this opportunity to check the integrity of the ring buffer. */
-	raw_spin_lock_irqsave(&cpu_buffer->reader_lock, flags);
 	rb_check_pages(cpu_buffer);
-	raw_spin_unlock_irqrestore(&cpu_buffer->reader_lock, flags);
 
 	atomic_dec(&cpu_buffer->resize_disabled);
 	kfree(iter->event);
@@ -6050,6 +6094,7 @@  int ring_buffer_subbuf_order_set(struct trace_buffer *buffer, int order)
 		cpu_buffer->pages = cpu_buffer->new_pages.next;
 		cpu_buffer->new_pages.next->prev = cpu_buffer->new_pages.prev;
 		cpu_buffer->new_pages.prev->next = cpu_buffer->new_pages.next;
+		cpu_buffer->pages_era++;
 
 		/* Clear the new_pages list */
 		INIT_LIST_HEAD(&cpu_buffer->new_pages);