Message ID | 20241204155257.1110338-6-cel@kernel.org (mailing list archive) |
---|---|
State | New |
Headers | show |
Series | Improve simple directory offset wrap behavior | expand |
On 12/4/24 10:52 AM, cel@kernel.org wrote: > From: Chuck Lever <chuck.lever@oracle.com> > > The mtree mechanism has been effective at creating directory offsets > that are stable over multiple opendir instances. However, it has not > been able to handle the subtleties of renames that are concurrent > with readdir. > > Instead of using the mtree to emit entries in the order of their > offset values, use it only to map incoming ctx->pos to a starting > entry. Then use the directory's d_children list, which is already > maintained properly by the dcache, to find the next child to emit. > > One of the sneaky things about this is that when the mtree-allocated > offset value wraps (which is very rare), looking up ctx->pos++ is > not going to find the next entry; it will return NULL. Instead, by > following the d_children list, the offset values can appear in any > order but all of the entries in the directory will be visited > eventually. > > Note also that the readdir() is guaranteed to reach the tail of this > list. Entries are added only at the head of d_children, and readdir > walks from its current position in that list towards its tail. > > Signed-off-by: Chuck Lever <chuck.lever@oracle.com> > --- > fs/libfs.c | 77 ++++++++++++++++++++++++++++++++++++++++-------------- > 1 file changed, 57 insertions(+), 20 deletions(-) > > diff --git a/fs/libfs.c b/fs/libfs.c > index fcb2cdf6e3f3..398eac385094 100644 > --- a/fs/libfs.c > +++ b/fs/libfs.c > @@ -243,12 +243,13 @@ EXPORT_SYMBOL(simple_dir_inode_operations); > > /* simple_offset_add() allocation range */ > enum { > - DIR_OFFSET_MIN = 2, > + DIR_OFFSET_MIN = 3, > DIR_OFFSET_MAX = LONG_MAX - 1, > }; > > /* simple_offset_add() never assigns these to a dentry */ > enum { > + DIR_OFFSET_FIRST = 2, /* Find first real entry */ > DIR_OFFSET_EOD = LONG_MAX, /* Marks EOD */ > > }; > @@ -456,19 +457,43 @@ static loff_t offset_dir_llseek(struct file *file, loff_t offset, int whence) > return vfs_setpos(file, offset, LONG_MAX); > } > > -static struct dentry *offset_find_next(struct offset_ctx *octx, loff_t offset) > +/* Cf. find_next_child() */ > +static struct dentry *find_next_sibling_locked(struct dentry *parent, > + struct dentry *dentry) There might be a better name for this function. It looks a lot like find_next_child(), but it acts more like scan_positives(). It starts looking for positive dentries starting at @dentry, thus it can return the dentry that was passed in @dentry. find_positive_from_locked() ?? > { > - MA_STATE(mas, &octx->mt, offset, offset); > + struct dentry *found = NULL; > + > + hlist_for_each_entry_from(dentry, d_sib) { > + if (!simple_positive(dentry)) > + continue; > + spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); > + if (simple_positive(dentry)) > + found = dget_dlock(dentry); > + spin_unlock(&dentry->d_lock); > + if (likely(found)) > + break; > + } > + return found; > +} > + > +static noinline_for_stack struct dentry * > +offset_dir_lookup(struct file *file, loff_t offset) > +{ > + struct dentry *parent = file->f_path.dentry; > struct dentry *child, *found = NULL; > + struct inode *inode = d_inode(parent); > + struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode); > + > + MA_STATE(mas, &octx->mt, offset, offset); > > rcu_read_lock(); > child = mas_find(&mas, DIR_OFFSET_MAX); > if (!child) > goto out; > - spin_lock(&child->d_lock); > - if (simple_positive(child)) > - found = dget_dlock(child); > - spin_unlock(&child->d_lock); > + > + spin_lock(&parent->d_lock); > + found = find_next_sibling_locked(parent, child); > + spin_unlock(&parent->d_lock); > out: > rcu_read_unlock(); > return found; > @@ -477,30 +502,42 @@ static struct dentry *offset_find_next(struct offset_ctx *octx, loff_t offset) > static bool offset_dir_emit(struct dir_context *ctx, struct dentry *dentry) > { > struct inode *inode = d_inode(dentry); > - long offset = dentry2offset(dentry); > > - return ctx->actor(ctx, dentry->d_name.name, dentry->d_name.len, offset, > - inode->i_ino, fs_umode_to_dtype(inode->i_mode)); > + return dir_emit(ctx, dentry->d_name.name, dentry->d_name.len, > + inode->i_ino, fs_umode_to_dtype(inode->i_mode)); > } > > -static void offset_iterate_dir(struct inode *inode, struct dir_context *ctx) > +static void offset_iterate_dir(struct file *file, struct dir_context *ctx) > { > - struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode); > + struct dentry *dir = file->f_path.dentry; > struct dentry *dentry; > > + if (ctx->pos == DIR_OFFSET_FIRST) { > + spin_lock(&dir->d_lock); > + dentry = find_next_sibling_locked(dir, d_first_child(dir)); > + spin_unlock(&dir->d_lock); > + } else > + dentry = offset_dir_lookup(file, ctx->pos); > + if (!dentry) > + goto out_eod; > + > while (true) { > - dentry = offset_find_next(octx, ctx->pos); > - if (!dentry) > - goto out_eod; > + struct dentry *next; > > - if (!offset_dir_emit(ctx, dentry)) { > - dput(dentry); > + ctx->pos = dentry2offset(dentry); > + if (!offset_dir_emit(ctx, dentry)) > break; > - } > > - ctx->pos = dentry2offset(dentry) + 1; > + spin_lock(&dir->d_lock); > + next = find_next_sibling_locked(dir, d_next_sibling(dentry)); > + spin_unlock(&dir->d_lock); > dput(dentry); > + > + if (!next) > + goto out_eod; > + dentry = next; > } > + dput(dentry); > return; > > out_eod: > @@ -539,7 +576,7 @@ static int offset_readdir(struct file *file, struct dir_context *ctx) > if (!dir_emit_dots(file, ctx)) > return 0; > if (ctx->pos != DIR_OFFSET_EOD) > - offset_iterate_dir(d_inode(dir), ctx); > + offset_iterate_dir(file, ctx); > return 0; > } >
On 12/8/24 12:11 PM, Chuck Lever wrote: > On 12/4/24 10:52 AM, cel@kernel.org wrote: >> From: Chuck Lever <chuck.lever@oracle.com> >> >> The mtree mechanism has been effective at creating directory offsets >> that are stable over multiple opendir instances. However, it has not >> been able to handle the subtleties of renames that are concurrent >> with readdir. >> >> Instead of using the mtree to emit entries in the order of their >> offset values, use it only to map incoming ctx->pos to a starting >> entry. Then use the directory's d_children list, which is already >> maintained properly by the dcache, to find the next child to emit. >> >> One of the sneaky things about this is that when the mtree-allocated >> offset value wraps (which is very rare), looking up ctx->pos++ is >> not going to find the next entry; it will return NULL. Instead, by >> following the d_children list, the offset values can appear in any >> order but all of the entries in the directory will be visited >> eventually. >> >> Note also that the readdir() is guaranteed to reach the tail of this >> list. Entries are added only at the head of d_children, and readdir >> walks from its current position in that list towards its tail. >> >> Signed-off-by: Chuck Lever <chuck.lever@oracle.com> >> --- >> fs/libfs.c | 77 ++++++++++++++++++++++++++++++++++++++++-------------- >> 1 file changed, 57 insertions(+), 20 deletions(-) >> >> diff --git a/fs/libfs.c b/fs/libfs.c >> index fcb2cdf6e3f3..398eac385094 100644 >> --- a/fs/libfs.c >> +++ b/fs/libfs.c >> @@ -243,12 +243,13 @@ EXPORT_SYMBOL(simple_dir_inode_operations); >> /* simple_offset_add() allocation range */ >> enum { >> - DIR_OFFSET_MIN = 2, >> + DIR_OFFSET_MIN = 3, >> DIR_OFFSET_MAX = LONG_MAX - 1, >> }; >> /* simple_offset_add() never assigns these to a dentry */ >> enum { >> + DIR_OFFSET_FIRST = 2, /* Find first real entry */ >> DIR_OFFSET_EOD = LONG_MAX, /* Marks EOD */ >> }; >> @@ -456,19 +457,43 @@ static loff_t offset_dir_llseek(struct file >> *file, loff_t offset, int whence) >> return vfs_setpos(file, offset, LONG_MAX); >> } >> -static struct dentry *offset_find_next(struct offset_ctx *octx, >> loff_t offset) >> +/* Cf. find_next_child() */ >> +static struct dentry *find_next_sibling_locked(struct dentry *parent, >> + struct dentry *dentry) > > There might be a better name for this function. > > It looks a lot like find_next_child(), but it acts more like > scan_positives(). It starts looking for positive dentries starting > at @dentry, thus it can return the dentry that was passed in @dentry. > > find_positive_from_locked() ?? > > >> { >> - MA_STATE(mas, &octx->mt, offset, offset); >> + struct dentry *found = NULL; >> + >> + hlist_for_each_entry_from(dentry, d_sib) { >> + if (!simple_positive(dentry)) >> + continue; >> + spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); >> + if (simple_positive(dentry)) >> + found = dget_dlock(dentry); >> + spin_unlock(&dentry->d_lock); >> + if (likely(found)) >> + break; >> + } >> + return found; >> +} >> + >> +static noinline_for_stack struct dentry * >> +offset_dir_lookup(struct file *file, loff_t offset) >> +{ >> + struct dentry *parent = file->f_path.dentry; >> struct dentry *child, *found = NULL; >> + struct inode *inode = d_inode(parent); >> + struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode); >> + >> + MA_STATE(mas, &octx->mt, offset, offset); >> rcu_read_lock(); >> child = mas_find(&mas, DIR_OFFSET_MAX); >> if (!child) >> goto out; >> - spin_lock(&child->d_lock); >> - if (simple_positive(child)) >> - found = dget_dlock(child); >> - spin_unlock(&child->d_lock); >> + >> + spin_lock(&parent->d_lock); >> + found = find_next_sibling_locked(parent, child); >> + spin_unlock(&parent->d_lock); >> out: >> rcu_read_unlock(); >> return found; >> @@ -477,30 +502,42 @@ static struct dentry *offset_find_next(struct >> offset_ctx *octx, loff_t offset) >> static bool offset_dir_emit(struct dir_context *ctx, struct dentry >> *dentry) >> { >> struct inode *inode = d_inode(dentry); >> - long offset = dentry2offset(dentry); >> - return ctx->actor(ctx, dentry->d_name.name, dentry->d_name.len, >> offset, >> - inode->i_ino, fs_umode_to_dtype(inode->i_mode)); >> + return dir_emit(ctx, dentry->d_name.name, dentry->d_name.len, >> + inode->i_ino, fs_umode_to_dtype(inode->i_mode)); >> } >> -static void offset_iterate_dir(struct inode *inode, struct >> dir_context *ctx) >> +static void offset_iterate_dir(struct file *file, struct dir_context >> *ctx) >> { >> - struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode); >> + struct dentry *dir = file->f_path.dentry; >> struct dentry *dentry; >> + if (ctx->pos == DIR_OFFSET_FIRST) { >> + spin_lock(&dir->d_lock); >> + dentry = find_next_sibling_locked(dir, d_first_child(dir)); >> + spin_unlock(&dir->d_lock); >> + } else >> + dentry = offset_dir_lookup(file, ctx->pos); >> + if (!dentry) >> + goto out_eod; >> + >> while (true) { >> - dentry = offset_find_next(octx, ctx->pos); >> - if (!dentry) >> - goto out_eod; >> + struct dentry *next; >> - if (!offset_dir_emit(ctx, dentry)) { >> - dput(dentry); >> + ctx->pos = dentry2offset(dentry); >> + if (!offset_dir_emit(ctx, dentry)) >> break; >> - } >> - ctx->pos = dentry2offset(dentry) + 1; >> + spin_lock(&dir->d_lock); >> + next = find_next_sibling_locked(dir, d_next_sibling(dentry)); >> + spin_unlock(&dir->d_lock); Recent coverity report: *** CID 1602474: Concurrent data access violations (ATOMICITY) /fs/libfs.c: 536 in offset_iterate_dir() 530 531 ctx->pos = dentry2offset(dentry); 532 if (!offset_dir_emit(ctx, dentry)) 533 break; 534 535 spin_lock(&dir->d_lock); >>> CID 1602474: Concurrent data access violations (ATOMICITY) >>> Using an unreliable value of "dentry" inside the second locked section. If the data that "dentry" depends on was changed by another thread, this use might be incorrect. 536 next = find_next_sibling_locked(dir, d_next_sibling(dentry)); 537 spin_unlock(&dir->d_lock); 538 dput(dentry); 539 540 if (!next) 541 goto out_eod; As far as I can tell, @dentry's list fields, which are the only fields accessed in find_next_sibling_locked(), are protected by dir->d_lock. We don't care about the other fields. Not sure if this is a false positive. Is there an annotation that will help clarify this situation? >> dput(dentry); >> + >> + if (!next) >> + goto out_eod; >> + dentry = next; >> } >> + dput(dentry); >> return; >> out_eod: >> @@ -539,7 +576,7 @@ static int offset_readdir(struct file *file, >> struct dir_context *ctx) >> if (!dir_emit_dots(file, ctx)) >> return 0; >> if (ctx->pos != DIR_OFFSET_EOD) >> - offset_iterate_dir(d_inode(dir), ctx); >> + offset_iterate_dir(file, ctx); >> return 0; >> } > >
On Sat, Dec 14, 2024 at 12:13:30PM -0500, Chuck Lever wrote: > > > +/* Cf. find_next_child() */ > > > +static struct dentry *find_next_sibling_locked(struct dentry *parent, > > > + struct dentry *dentry) > > > > There might be a better name for this function. There might be better calling conventions for it, TBH. AFAICS, all callers are directly surrounded by grabbing/releasing ->d_lock on parent. Why not fold that in, and to hell with any mentionings of "locked" in the name...
On 12/14/24 12:49 PM, Al Viro wrote: > On Sat, Dec 14, 2024 at 12:13:30PM -0500, Chuck Lever wrote: >>>> +/* Cf. find_next_child() */ >>>> +static struct dentry *find_next_sibling_locked(struct dentry *parent, >>>> + struct dentry *dentry) >>> >>> There might be a better name for this function. > > There might be better calling conventions for it, TBH. > AFAICS, all callers are directly surrounded by grabbing/releasing > ->d_lock on parent. Why not fold that in, and to hell with any > mentionings of "locked" in the name... I've tried it both ways, couldn't make up my mind. I'll try it again.
On Sat, Dec 14, 2024 at 02:22:41PM -0500, Chuck Lever wrote: > On 12/14/24 12:49 PM, Al Viro wrote: > > On Sat, Dec 14, 2024 at 12:13:30PM -0500, Chuck Lever wrote: > > > > > +/* Cf. find_next_child() */ > > > > > +static struct dentry *find_next_sibling_locked(struct dentry *parent, > > > > > + struct dentry *dentry) > > > > > > > > There might be a better name for this function. > > > > There might be better calling conventions for it, TBH. > > AFAICS, all callers are directly surrounded by grabbing/releasing > > ->d_lock on parent. Why not fold that in, and to hell with any > > mentionings of "locked" in the name... > > I've tried it both ways, couldn't make up my mind. I'll try it again. Single return there, so it's just a single spin_lock() on entry and spin_unlock() on exit... No idea if it'll make coverity any happier, but it would be easier on human readers.
diff --git a/fs/libfs.c b/fs/libfs.c index fcb2cdf6e3f3..398eac385094 100644 --- a/fs/libfs.c +++ b/fs/libfs.c @@ -243,12 +243,13 @@ EXPORT_SYMBOL(simple_dir_inode_operations); /* simple_offset_add() allocation range */ enum { - DIR_OFFSET_MIN = 2, + DIR_OFFSET_MIN = 3, DIR_OFFSET_MAX = LONG_MAX - 1, }; /* simple_offset_add() never assigns these to a dentry */ enum { + DIR_OFFSET_FIRST = 2, /* Find first real entry */ DIR_OFFSET_EOD = LONG_MAX, /* Marks EOD */ }; @@ -456,19 +457,43 @@ static loff_t offset_dir_llseek(struct file *file, loff_t offset, int whence) return vfs_setpos(file, offset, LONG_MAX); } -static struct dentry *offset_find_next(struct offset_ctx *octx, loff_t offset) +/* Cf. find_next_child() */ +static struct dentry *find_next_sibling_locked(struct dentry *parent, + struct dentry *dentry) { - MA_STATE(mas, &octx->mt, offset, offset); + struct dentry *found = NULL; + + hlist_for_each_entry_from(dentry, d_sib) { + if (!simple_positive(dentry)) + continue; + spin_lock_nested(&dentry->d_lock, DENTRY_D_LOCK_NESTED); + if (simple_positive(dentry)) + found = dget_dlock(dentry); + spin_unlock(&dentry->d_lock); + if (likely(found)) + break; + } + return found; +} + +static noinline_for_stack struct dentry * +offset_dir_lookup(struct file *file, loff_t offset) +{ + struct dentry *parent = file->f_path.dentry; struct dentry *child, *found = NULL; + struct inode *inode = d_inode(parent); + struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode); + + MA_STATE(mas, &octx->mt, offset, offset); rcu_read_lock(); child = mas_find(&mas, DIR_OFFSET_MAX); if (!child) goto out; - spin_lock(&child->d_lock); - if (simple_positive(child)) - found = dget_dlock(child); - spin_unlock(&child->d_lock); + + spin_lock(&parent->d_lock); + found = find_next_sibling_locked(parent, child); + spin_unlock(&parent->d_lock); out: rcu_read_unlock(); return found; @@ -477,30 +502,42 @@ static struct dentry *offset_find_next(struct offset_ctx *octx, loff_t offset) static bool offset_dir_emit(struct dir_context *ctx, struct dentry *dentry) { struct inode *inode = d_inode(dentry); - long offset = dentry2offset(dentry); - return ctx->actor(ctx, dentry->d_name.name, dentry->d_name.len, offset, - inode->i_ino, fs_umode_to_dtype(inode->i_mode)); + return dir_emit(ctx, dentry->d_name.name, dentry->d_name.len, + inode->i_ino, fs_umode_to_dtype(inode->i_mode)); } -static void offset_iterate_dir(struct inode *inode, struct dir_context *ctx) +static void offset_iterate_dir(struct file *file, struct dir_context *ctx) { - struct offset_ctx *octx = inode->i_op->get_offset_ctx(inode); + struct dentry *dir = file->f_path.dentry; struct dentry *dentry; + if (ctx->pos == DIR_OFFSET_FIRST) { + spin_lock(&dir->d_lock); + dentry = find_next_sibling_locked(dir, d_first_child(dir)); + spin_unlock(&dir->d_lock); + } else + dentry = offset_dir_lookup(file, ctx->pos); + if (!dentry) + goto out_eod; + while (true) { - dentry = offset_find_next(octx, ctx->pos); - if (!dentry) - goto out_eod; + struct dentry *next; - if (!offset_dir_emit(ctx, dentry)) { - dput(dentry); + ctx->pos = dentry2offset(dentry); + if (!offset_dir_emit(ctx, dentry)) break; - } - ctx->pos = dentry2offset(dentry) + 1; + spin_lock(&dir->d_lock); + next = find_next_sibling_locked(dir, d_next_sibling(dentry)); + spin_unlock(&dir->d_lock); dput(dentry); + + if (!next) + goto out_eod; + dentry = next; } + dput(dentry); return; out_eod: @@ -539,7 +576,7 @@ static int offset_readdir(struct file *file, struct dir_context *ctx) if (!dir_emit_dots(file, ctx)) return 0; if (ctx->pos != DIR_OFFSET_EOD) - offset_iterate_dir(d_inode(dir), ctx); + offset_iterate_dir(file, ctx); return 0; }