diff mbox series

[v4,5/5] libfs: Use d_children list to iterate simple_offset directories

Message ID 20241204155257.1110338-6-cel@kernel.org (mailing list archive)
State New
Headers show
Series Improve simple directory offset wrap behavior | expand

Commit Message

Chuck Lever Dec. 4, 2024, 3:52 p.m. UTC
From: Chuck Lever <chuck.lever@oracle.com>

The mtree mechanism has been effective at creating directory offsets
that are stable over multiple opendir instances. However, it has not
been able to handle the subtleties of renames that are concurrent
with readdir.

Instead of using the mtree to emit entries in the order of their
offset values, use it only to map incoming ctx->pos to a starting
entry. Then use the directory's d_children list, which is already
maintained properly by the dcache, to find the next child to emit.

One of the sneaky things about this is that when the mtree-allocated
offset value wraps (which is very rare), looking up ctx->pos++ is
not going to find the next entry; it will return NULL. Instead, by
following the d_children list, the offset values can appear in any
order but all of the entries in the directory will be visited
eventually.

Note also that the readdir() is guaranteed to reach the tail of this
list. Entries are added only at the head of d_children, and readdir
walks from its current position in that list towards its tail.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 fs/libfs.c | 77 ++++++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 57 insertions(+), 20 deletions(-)

Comments

Chuck Lever Dec. 8, 2024, 5:11 p.m. UTC | #1
On 12/4/24 10:52 AM, cel@kernel.org wrote:
> From: Chuck Lever <chuck.lever@oracle.com>
> 
> The mtree mechanism has been effective at creating directory offsets
> that are stable over multiple opendir instances. However, it has not
> been able to handle the subtleties of renames that are concurrent
> with readdir.
> 
> Instead of using the mtree to emit entries in the order of their
> offset values, use it only to map incoming ctx->pos to a starting
> entry. Then use the directory's d_children list, which is already
> maintained properly by the dcache, to find the next child to emit.
> 
> One of the sneaky things about this is that when the mtree-allocated
> offset value wraps (which is very rare), looking up ctx->pos++ is
> not going to find the next entry; it will return NULL. Instead, by
> following the d_children list, the offset values can appear in any
> order but all of the entries in the directory will be visited
> eventually.
> 
> Note also that the readdir() is guaranteed to reach the tail of this
> list. Entries are added only at the head of d_children, and readdir
> walks from its current position in that list towards its tail.
> 
> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
> ---
>   fs/libfs.c | 77 ++++++++++++++++++++++++++++++++++++++++--------------
>   1 file changed, 57 insertions(+), 20 deletions(-)
> 
> diff --git a/fs/libfs.c b/fs/libfs.c
> index fcb2cdf6e3f3..398eac385094 100644
> --- a/fs/libfs.c
> +++ b/fs/libfs.c
> @@ -243,12 +243,13 @@ EXPORT_SYMBOL(simple_dir_inode_operations);
>   
>   /* simple_offset_add() allocation range */
>   enum {
> -	DIR_OFFSET_MIN		= 2,
> +	DIR_OFFSET_MIN		= 3,
>   	DIR_OFFSET_MAX		= LONG_MAX - 1,
>   };
>   
>   /* simple_offset_add() never assigns these to a dentry */
>   enum {
> +	DIR_OFFSET_FIRST	= 2,		/* Find first real entry */
>   	DIR_OFFSET_EOD		= LONG_MAX,	/* Marks EOD */
>   
>   };
> @@ -456,19 +457,43 @@ static loff_t offset_dir_llseek(struct file *file, loff_t offset, int whence)
>   	return vfs_setpos(file, offset, LONG_MAX);
>   }
>   
> -static struct dentry *offset_find_next(struct offset_ctx *octx, loff_t offset)
> +/* Cf. find_next_child() */
> +static struct dentry *find_next_sibling_locked(struct dentry *parent,
> +					       struct dentry *dentry)

There might be a better name for this function.

It looks a lot like find_next_child(), but it acts more like
scan_positives(). It starts looking for positive dentries starting
at @dentry, thus it can return the dentry that was passed in @dentry.

find_positive_from_locked()  ??


>   {
> -	MA_STATE(mas, &octx->mt, offset, offset);
> +	struct dentry *found = NULL;
> +
> +	hlist_for_each_entry_from(dentry, d_sib) {
> +		if (!simple_positive(dentry))
> +			continue;
> +		spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
> +		if (simple_positive(dentry))
> +			found = dget_dlock(dentry);
> +		spin_unlock(&dentry->d_lock);
> +		if (likely(found))
> +			break;
> +	}
> +	return found;
> +}
> +
> +static noinline_for_stack struct dentry *
> +offset_dir_lookup(struct file *file, loff_t offset)
> +{
> +	struct dentry *parent = file->f_path.dentry;
>   	struct dentry *child, *found = NULL;
> +	struct inode *inode = d_inode(parent);
> +	struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode);
> +
> +	MA_STATE(mas, &octx->mt, offset, offset);
>   
>   	rcu_read_lock();
>   	child = mas_find(&mas, DIR_OFFSET_MAX);
>   	if (!child)
>   		goto out;
> -	spin_lock(&child->d_lock);
> -	if (simple_positive(child))
> -		found = dget_dlock(child);
> -	spin_unlock(&child->d_lock);
> +
> +	spin_lock(&parent->d_lock);
> +	found = find_next_sibling_locked(parent, child);
> +	spin_unlock(&parent->d_lock);
>   out:
>   	rcu_read_unlock();
>   	return found;
> @@ -477,30 +502,42 @@ static struct dentry *offset_find_next(struct offset_ctx *octx, loff_t offset)
>   static bool offset_dir_emit(struct dir_context *ctx, struct dentry *dentry)
>   {
>   	struct inode *inode = d_inode(dentry);
> -	long offset = dentry2offset(dentry);
>   
> -	return ctx->actor(ctx, dentry->d_name.name, dentry->d_name.len, offset,
> -			  inode->i_ino, fs_umode_to_dtype(inode->i_mode));
> +	return dir_emit(ctx, dentry->d_name.name, dentry->d_name.len,
> +			inode->i_ino, fs_umode_to_dtype(inode->i_mode));
>   }
>   
> -static void offset_iterate_dir(struct inode *inode, struct dir_context *ctx)
> +static void offset_iterate_dir(struct file *file, struct dir_context *ctx)
>   {
> -	struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode);
> +	struct dentry *dir = file->f_path.dentry;
>   	struct dentry *dentry;
>   
> +	if (ctx->pos == DIR_OFFSET_FIRST) {
> +		spin_lock(&dir->d_lock);
> +		dentry = find_next_sibling_locked(dir, d_first_child(dir));
> +		spin_unlock(&dir->d_lock);
> +	} else
> +		dentry = offset_dir_lookup(file, ctx->pos);
> +	if (!dentry)
> +		goto out_eod;
> +
>   	while (true) {
> -		dentry = offset_find_next(octx, ctx->pos);
> -		if (!dentry)
> -			goto out_eod;
> +		struct dentry *next;
>   
> -		if (!offset_dir_emit(ctx, dentry)) {
> -			dput(dentry);
> +		ctx->pos = dentry2offset(dentry);
> +		if (!offset_dir_emit(ctx, dentry))
>   			break;
> -		}
>   
> -		ctx->pos = dentry2offset(dentry) + 1;
> +		spin_lock(&dir->d_lock);
> +		next = find_next_sibling_locked(dir, d_next_sibling(dentry));
> +		spin_unlock(&dir->d_lock);
>   		dput(dentry);
> +
> +		if (!next)
> +			goto out_eod;
> +		dentry = next;
>   	}
> +	dput(dentry);
>   	return;
>   
>   out_eod:
> @@ -539,7 +576,7 @@ static int offset_readdir(struct file *file, struct dir_context *ctx)
>   	if (!dir_emit_dots(file, ctx))
>   		return 0;
>   	if (ctx->pos != DIR_OFFSET_EOD)
> -		offset_iterate_dir(d_inode(dir), ctx);
> +		offset_iterate_dir(file, ctx);
>   	return 0;
>   }
>
Chuck Lever Dec. 14, 2024, 5:13 p.m. UTC | #2
On 12/8/24 12:11 PM, Chuck Lever wrote:
> On 12/4/24 10:52 AM, cel@kernel.org wrote:
>> From: Chuck Lever <chuck.lever@oracle.com>
>>
>> The mtree mechanism has been effective at creating directory offsets
>> that are stable over multiple opendir instances. However, it has not
>> been able to handle the subtleties of renames that are concurrent
>> with readdir.
>>
>> Instead of using the mtree to emit entries in the order of their
>> offset values, use it only to map incoming ctx->pos to a starting
>> entry. Then use the directory's d_children list, which is already
>> maintained properly by the dcache, to find the next child to emit.
>>
>> One of the sneaky things about this is that when the mtree-allocated
>> offset value wraps (which is very rare), looking up ctx->pos++ is
>> not going to find the next entry; it will return NULL. Instead, by
>> following the d_children list, the offset values can appear in any
>> order but all of the entries in the directory will be visited
>> eventually.
>>
>> Note also that the readdir() is guaranteed to reach the tail of this
>> list. Entries are added only at the head of d_children, and readdir
>> walks from its current position in that list towards its tail.
>>
>> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
>> ---
>>   fs/libfs.c | 77 ++++++++++++++++++++++++++++++++++++++++--------------
>>   1 file changed, 57 insertions(+), 20 deletions(-)
>>
>> diff --git a/fs/libfs.c b/fs/libfs.c
>> index fcb2cdf6e3f3..398eac385094 100644
>> --- a/fs/libfs.c
>> +++ b/fs/libfs.c
>> @@ -243,12 +243,13 @@ EXPORT_SYMBOL(simple_dir_inode_operations);
>>   /* simple_offset_add() allocation range */
>>   enum {
>> -    DIR_OFFSET_MIN        = 2,
>> +    DIR_OFFSET_MIN        = 3,
>>       DIR_OFFSET_MAX        = LONG_MAX - 1,
>>   };
>>   /* simple_offset_add() never assigns these to a dentry */
>>   enum {
>> +    DIR_OFFSET_FIRST    = 2,        /* Find first real entry */
>>       DIR_OFFSET_EOD        = LONG_MAX,    /* Marks EOD */
>>   };
>> @@ -456,19 +457,43 @@ static loff_t offset_dir_llseek(struct file 
>> *file, loff_t offset, int whence)
>>       return vfs_setpos(file, offset, LONG_MAX);
>>   }
>> -static struct dentry *offset_find_next(struct offset_ctx *octx, 
>> loff_t offset)
>> +/* Cf. find_next_child() */
>> +static struct dentry *find_next_sibling_locked(struct dentry *parent,
>> +                           struct dentry *dentry)
> 
> There might be a better name for this function.
> 
> It looks a lot like find_next_child(), but it acts more like
> scan_positives(). It starts looking for positive dentries starting
> at @dentry, thus it can return the dentry that was passed in @dentry.
> 
> find_positive_from_locked()  ??
> 
> 
>>   {
>> -    MA_STATE(mas, &octx->mt, offset, offset);
>> +    struct dentry *found = NULL;
>> +
>> +    hlist_for_each_entry_from(dentry, d_sib) {
>> +        if (!simple_positive(dentry))
>> +            continue;
>> +        spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
>> +        if (simple_positive(dentry))
>> +            found = dget_dlock(dentry);
>> +        spin_unlock(&dentry->d_lock);
>> +        if (likely(found))
>> +            break;
>> +    }
>> +    return found;
>> +}
>> +
>> +static noinline_for_stack struct dentry *
>> +offset_dir_lookup(struct file *file, loff_t offset)
>> +{
>> +    struct dentry *parent = file->f_path.dentry;
>>       struct dentry *child, *found = NULL;
>> +    struct inode *inode = d_inode(parent);
>> +    struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode);
>> +
>> +    MA_STATE(mas, &octx->mt, offset, offset);
>>       rcu_read_lock();
>>       child = mas_find(&mas, DIR_OFFSET_MAX);
>>       if (!child)
>>           goto out;
>> -    spin_lock(&child->d_lock);
>> -    if (simple_positive(child))
>> -        found = dget_dlock(child);
>> -    spin_unlock(&child->d_lock);
>> +
>> +    spin_lock(&parent->d_lock);
>> +    found = find_next_sibling_locked(parent, child);
>> +    spin_unlock(&parent->d_lock);
>>   out:
>>       rcu_read_unlock();
>>       return found;
>> @@ -477,30 +502,42 @@ static struct dentry *offset_find_next(struct 
>> offset_ctx *octx, loff_t offset)
>>   static bool offset_dir_emit(struct dir_context *ctx, struct dentry 
>> *dentry)
>>   {
>>       struct inode *inode = d_inode(dentry);
>> -    long offset = dentry2offset(dentry);
>> -    return ctx->actor(ctx, dentry->d_name.name, dentry->d_name.len, 
>> offset,
>> -              inode->i_ino, fs_umode_to_dtype(inode->i_mode));
>> +    return dir_emit(ctx, dentry->d_name.name, dentry->d_name.len,
>> +            inode->i_ino, fs_umode_to_dtype(inode->i_mode));
>>   }
>> -static void offset_iterate_dir(struct inode *inode, struct 
>> dir_context *ctx)
>> +static void offset_iterate_dir(struct file *file, struct dir_context 
>> *ctx)
>>   {
>> -    struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode);
>> +    struct dentry *dir = file->f_path.dentry;
>>       struct dentry *dentry;
>> +    if (ctx->pos == DIR_OFFSET_FIRST) {
>> +        spin_lock(&dir->d_lock);
>> +        dentry = find_next_sibling_locked(dir, d_first_child(dir));
>> +        spin_unlock(&dir->d_lock);
>> +    } else
>> +        dentry = offset_dir_lookup(file, ctx->pos);
>> +    if (!dentry)
>> +        goto out_eod;
>> +
>>       while (true) {
>> -        dentry = offset_find_next(octx, ctx->pos);
>> -        if (!dentry)
>> -            goto out_eod;
>> +        struct dentry *next;
>> -        if (!offset_dir_emit(ctx, dentry)) {
>> -            dput(dentry);
>> +        ctx->pos = dentry2offset(dentry);
>> +        if (!offset_dir_emit(ctx, dentry))
>>               break;
>> -        }
>> -        ctx->pos = dentry2offset(dentry) + 1;
>> +        spin_lock(&dir->d_lock);
>> +        next = find_next_sibling_locked(dir, d_next_sibling(dentry));
>> +        spin_unlock(&dir->d_lock);

Recent coverity report:

*** CID 1602474:  Concurrent data access violations  (ATOMICITY)
/fs/libfs.c: 536 in offset_iterate_dir()
530
531     		ctx->pos = dentry2offset(dentry);
532     		if (!offset_dir_emit(ctx, dentry))
533     			break;
534
535     		spin_lock(&dir->d_lock);
 >>>     CID 1602474:  Concurrent data access violations  (ATOMICITY)
 >>>     Using an unreliable value of "dentry" inside the second locked 
section. If the data that "dentry" depends on was changed by another 
thread, this use might be incorrect.
536     		next = find_next_sibling_locked(dir, d_next_sibling(dentry));
537     		spin_unlock(&dir->d_lock);
538     		dput(dentry);
539
540     		if (!next)
541     			goto out_eod;

As far as I can tell, @dentry's list fields, which are the only fields
accessed in find_next_sibling_locked(), are protected by dir->d_lock. We
don't care about the other fields.

Not sure if this is a false positive. Is there an annotation that will
help clarify this situation?


>>           dput(dentry);
>> +
>> +        if (!next)
>> +            goto out_eod;
>> +        dentry = next;
>>       }
>> +    dput(dentry);
>>       return;
>>   out_eod:
>> @@ -539,7 +576,7 @@ static int offset_readdir(struct file *file, 
>> struct dir_context *ctx)
>>       if (!dir_emit_dots(file, ctx))
>>           return 0;
>>       if (ctx->pos != DIR_OFFSET_EOD)
>> -        offset_iterate_dir(d_inode(dir), ctx);
>> +        offset_iterate_dir(file, ctx);
>>       return 0;
>>   }
> 
>
Al Viro Dec. 14, 2024, 5:49 p.m. UTC | #3
On Sat, Dec 14, 2024 at 12:13:30PM -0500, Chuck Lever wrote:
> > > +/* Cf. find_next_child() */
> > > +static struct dentry *find_next_sibling_locked(struct dentry *parent,
> > > +                           struct dentry *dentry)
> > 
> > There might be a better name for this function.

There might be better calling conventions for it, TBH.
AFAICS, all callers are directly surrounded by grabbing/releasing
->d_lock on parent.  Why not fold that in, and to hell with any
mentionings of "locked" in the name...
Chuck Lever Dec. 14, 2024, 7:22 p.m. UTC | #4
On 12/14/24 12:49 PM, Al Viro wrote:
> On Sat, Dec 14, 2024 at 12:13:30PM -0500, Chuck Lever wrote:
>>>> +/* Cf. find_next_child() */
>>>> +static struct dentry *find_next_sibling_locked(struct dentry *parent,
>>>> +                           struct dentry *dentry)
>>>
>>> There might be a better name for this function.
> 
> There might be better calling conventions for it, TBH.
> AFAICS, all callers are directly surrounded by grabbing/releasing
> ->d_lock on parent.  Why not fold that in, and to hell with any
> mentionings of "locked" in the name...

I've tried it both ways, couldn't make up my mind. I'll try it again.
Al Viro Dec. 14, 2024, 7:59 p.m. UTC | #5
On Sat, Dec 14, 2024 at 02:22:41PM -0500, Chuck Lever wrote:
> On 12/14/24 12:49 PM, Al Viro wrote:
> > On Sat, Dec 14, 2024 at 12:13:30PM -0500, Chuck Lever wrote:
> > > > > +/* Cf. find_next_child() */
> > > > > +static struct dentry *find_next_sibling_locked(struct dentry *parent,
> > > > > +                           struct dentry *dentry)
> > > > 
> > > > There might be a better name for this function.
> > 
> > There might be better calling conventions for it, TBH.
> > AFAICS, all callers are directly surrounded by grabbing/releasing
> > ->d_lock on parent.  Why not fold that in, and to hell with any
> > mentionings of "locked" in the name...
> 
> I've tried it both ways, couldn't make up my mind. I'll try it again.

Single return there, so it's just a single spin_lock() on entry and
spin_unlock() on exit...  No idea if it'll make coverity any happier,
but it would be easier on human readers.
diff mbox series

Patch

diff --git a/fs/libfs.c b/fs/libfs.c
index fcb2cdf6e3f3..398eac385094 100644
--- a/fs/libfs.c
+++ b/fs/libfs.c
@@ -243,12 +243,13 @@  EXPORT_SYMBOL(simple_dir_inode_operations);
 
 /* simple_offset_add() allocation range */
 enum {
-	DIR_OFFSET_MIN		= 2,
+	DIR_OFFSET_MIN		= 3,
 	DIR_OFFSET_MAX		= LONG_MAX - 1,
 };
 
 /* simple_offset_add() never assigns these to a dentry */
 enum {
+	DIR_OFFSET_FIRST	= 2,		/* Find first real entry */
 	DIR_OFFSET_EOD		= LONG_MAX,	/* Marks EOD */
 
 };
@@ -456,19 +457,43 @@  static loff_t offset_dir_llseek(struct file *file, loff_t offset, int whence)
 	return vfs_setpos(file, offset, LONG_MAX);
 }
 
-static struct dentry *offset_find_next(struct offset_ctx *octx, loff_t offset)
+/* Cf. find_next_child() */
+static struct dentry *find_next_sibling_locked(struct dentry *parent,
+					       struct dentry *dentry)
 {
-	MA_STATE(mas, &octx->mt, offset, offset);
+	struct dentry *found = NULL;
+
+	hlist_for_each_entry_from(dentry, d_sib) {
+		if (!simple_positive(dentry))
+			continue;
+		spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED);
+		if (simple_positive(dentry))
+			found = dget_dlock(dentry);
+		spin_unlock(&dentry->d_lock);
+		if (likely(found))
+			break;
+	}
+	return found;
+}
+
+static noinline_for_stack struct dentry *
+offset_dir_lookup(struct file *file, loff_t offset)
+{
+	struct dentry *parent = file->f_path.dentry;
 	struct dentry *child, *found = NULL;
+	struct inode *inode = d_inode(parent);
+	struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode);
+
+	MA_STATE(mas, &octx->mt, offset, offset);
 
 	rcu_read_lock();
 	child = mas_find(&mas, DIR_OFFSET_MAX);
 	if (!child)
 		goto out;
-	spin_lock(&child->d_lock);
-	if (simple_positive(child))
-		found = dget_dlock(child);
-	spin_unlock(&child->d_lock);
+
+	spin_lock(&parent->d_lock);
+	found = find_next_sibling_locked(parent, child);
+	spin_unlock(&parent->d_lock);
 out:
 	rcu_read_unlock();
 	return found;
@@ -477,30 +502,42 @@  static struct dentry *offset_find_next(struct offset_ctx *octx, loff_t offset)
 static bool offset_dir_emit(struct dir_context *ctx, struct dentry *dentry)
 {
 	struct inode *inode = d_inode(dentry);
-	long offset = dentry2offset(dentry);
 
-	return ctx->actor(ctx, dentry->d_name.name, dentry->d_name.len, offset,
-			  inode->i_ino, fs_umode_to_dtype(inode->i_mode));
+	return dir_emit(ctx, dentry->d_name.name, dentry->d_name.len,
+			inode->i_ino, fs_umode_to_dtype(inode->i_mode));
 }
 
-static void offset_iterate_dir(struct inode *inode, struct dir_context *ctx)
+static void offset_iterate_dir(struct file *file, struct dir_context *ctx)
 {
-	struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode);
+	struct dentry *dir = file->f_path.dentry;
 	struct dentry *dentry;
 
+	if (ctx->pos == DIR_OFFSET_FIRST) {
+		spin_lock(&dir->d_lock);
+		dentry = find_next_sibling_locked(dir, d_first_child(dir));
+		spin_unlock(&dir->d_lock);
+	} else
+		dentry = offset_dir_lookup(file, ctx->pos);
+	if (!dentry)
+		goto out_eod;
+
 	while (true) {
-		dentry = offset_find_next(octx, ctx->pos);
-		if (!dentry)
-			goto out_eod;
+		struct dentry *next;
 
-		if (!offset_dir_emit(ctx, dentry)) {
-			dput(dentry);
+		ctx->pos = dentry2offset(dentry);
+		if (!offset_dir_emit(ctx, dentry))
 			break;
-		}
 
-		ctx->pos = dentry2offset(dentry) + 1;
+		spin_lock(&dir->d_lock);
+		next = find_next_sibling_locked(dir, d_next_sibling(dentry));
+		spin_unlock(&dir->d_lock);
 		dput(dentry);
+
+		if (!next)
+			goto out_eod;
+		dentry = next;
 	}
+	dput(dentry);
 	return;
 
 out_eod:
@@ -539,7 +576,7 @@  static int offset_readdir(struct file *file, struct dir_context *ctx)
 	if (!dir_emit_dots(file, ctx))
 		return 0;
 	if (ctx->pos != DIR_OFFSET_EOD)
-		offset_iterate_dir(d_inode(dir), ctx);
+		offset_iterate_dir(file, ctx);
 	return 0;
 }