diff mbox series

[v2,5/7] quota: fix dqput() to follow the guarantees dquot_srcu should provide

Message ID 20230628132155.1560425-6-libaokun1@huawei.com (mailing list archive)
State New, archived
Headers show
Series quota: fix race condition between dqput() and dquot_mark_dquot_dirty() | expand

Commit Message

Baokun Li June 28, 2023, 1:21 p.m. UTC
The dquot_mark_dquot_dirty() using dquot references from the inode
should be protected by dquot_srcu. quota_off code takes care to call
synchronize_srcu(&dquot_srcu) to not drop dquot references while they
are used by other users. But dquot_transfer() breaks this assumption.
We call dquot_transfer() to drop the last reference of dquot and add
it to free_dquots, but there may still be other users using the dquot
at this time, as shown in the function graph below:

       cpu1              cpu2
_________________|_________________
wb_do_writeback         CHOWN(1)
 ...
  ext4_da_update_reserve_space
   dquot_claim_block
    ...
     dquot_mark_dquot_dirty // try to dirty old quota
      test_bit(DQ_ACTIVE_B, &dquot->dq_flags) // still ACTIVE
      if (test_bit(DQ_MOD_B, &dquot->dq_flags))
      // test no dirty, wait dq_list_lock
                    ...
                     dquot_transfer
                      __dquot_transfer
                      dqput_all(transfer_from) // rls old dquot
                       dqput // last dqput
                        dquot_release
                         clear_bit(DQ_ACTIVE_B, &dquot->dq_flags)
                        atomic_dec(&dquot->dq_count)
                        put_dquot_last(dquot)
                         list_add_tail(&dquot->dq_free, &free_dquots)
                         // add the dquot to free_dquots
      if (!test_and_set_bit(DQ_MOD_B, &dquot->dq_flags))
        add dqi_dirty_list // add released dquot to dirty_list

This can cause various issues, such as dquot being destroyed by
dqcache_shrink_scan() after being added to free_dquots, which can trigger
a UAF in dquot_mark_dquot_dirty(); or after dquot is added to free_dquots
and then to dirty_list, it is added to free_dquots again after
dquot_writeback_dquots() is executed, which causes the free_dquots list to
be corrupted and triggers a UAF when dqcache_shrink_scan() is called for
freeing dquot twice.

As Honza said, we need to fix dquot_transfer() to follow the guarantees
dquot_srcu should provide. But calling synchronize_srcu() directly from
dquot_transfer() is too expensive (and mostly unnecessary). So we add
dquot whose last reference should be dropped to the new global dquot
list releasing_dquots, and then queue work item which would call
synchronize_srcu() and after that perform the final cleanup of all the
dquots on releasing_dquots.

Fixes: 4580b30ea887 ("quota: Do not dirty bad dquots")
Suggested-by: Jan Kara <jack@suse.cz>
Signed-off-by: Baokun Li <libaokun1@huawei.com>
---
 fs/quota/dquot.c | 85 ++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 71 insertions(+), 14 deletions(-)

Comments

Jan Kara June 29, 2023, 10:59 a.m. UTC | #1
On Wed 28-06-23 21:21:53, Baokun Li wrote:
> The dquot_mark_dquot_dirty() using dquot references from the inode
> should be protected by dquot_srcu. quota_off code takes care to call
> synchronize_srcu(&dquot_srcu) to not drop dquot references while they
> are used by other users. But dquot_transfer() breaks this assumption.
> We call dquot_transfer() to drop the last reference of dquot and add
> it to free_dquots, but there may still be other users using the dquot
> at this time, as shown in the function graph below:
> 
>        cpu1              cpu2
> _________________|_________________
> wb_do_writeback         CHOWN(1)
>  ...
>   ext4_da_update_reserve_space
>    dquot_claim_block
>     ...
>      dquot_mark_dquot_dirty // try to dirty old quota
>       test_bit(DQ_ACTIVE_B, &dquot->dq_flags) // still ACTIVE
>       if (test_bit(DQ_MOD_B, &dquot->dq_flags))
>       // test no dirty, wait dq_list_lock
>                     ...
>                      dquot_transfer
>                       __dquot_transfer
>                       dqput_all(transfer_from) // rls old dquot
>                        dqput // last dqput
>                         dquot_release
>                          clear_bit(DQ_ACTIVE_B, &dquot->dq_flags)
>                         atomic_dec(&dquot->dq_count)
>                         put_dquot_last(dquot)
>                          list_add_tail(&dquot->dq_free, &free_dquots)
>                          // add the dquot to free_dquots
>       if (!test_and_set_bit(DQ_MOD_B, &dquot->dq_flags))
>         add dqi_dirty_list // add released dquot to dirty_list
> 
> This can cause various issues, such as dquot being destroyed by
> dqcache_shrink_scan() after being added to free_dquots, which can trigger
> a UAF in dquot_mark_dquot_dirty(); or after dquot is added to free_dquots
> and then to dirty_list, it is added to free_dquots again after
> dquot_writeback_dquots() is executed, which causes the free_dquots list to
> be corrupted and triggers a UAF when dqcache_shrink_scan() is called for
> freeing dquot twice.
> 
> As Honza said, we need to fix dquot_transfer() to follow the guarantees
> dquot_srcu should provide. But calling synchronize_srcu() directly from
> dquot_transfer() is too expensive (and mostly unnecessary). So we add
> dquot whose last reference should be dropped to the new global dquot
> list releasing_dquots, and then queue work item which would call
> synchronize_srcu() and after that perform the final cleanup of all the
> dquots on releasing_dquots.
> 
> Fixes: 4580b30ea887 ("quota: Do not dirty bad dquots")
> Suggested-by: Jan Kara <jack@suse.cz>
> Signed-off-by: Baokun Li <libaokun1@huawei.com>
> ---
>  fs/quota/dquot.c | 85 ++++++++++++++++++++++++++++++++++++++++--------
>  1 file changed, 71 insertions(+), 14 deletions(-)
> 
> diff --git a/fs/quota/dquot.c b/fs/quota/dquot.c
> index 09787e4f6a00..e8e702ac64e5 100644
> --- a/fs/quota/dquot.c
> +++ b/fs/quota/dquot.c
> @@ -270,6 +270,9 @@ static qsize_t inode_get_rsv_space(struct inode *inode);
>  static qsize_t __inode_get_rsv_space(struct inode *inode);
>  static int __dquot_initialize(struct inode *inode, int type);
>  
> +static void quota_release_workfn(struct work_struct *work);
> +static DECLARE_DELAYED_WORK(quota_release_work, quota_release_workfn);
> +
>  static inline unsigned int
>  hashfn(const struct super_block *sb, struct kqid qid)
>  {
> @@ -569,6 +572,8 @@ static void invalidate_dquots(struct super_block *sb, int type)
>  	struct dquot *dquot, *tmp;
>  
>  restart:
> +	flush_delayed_work(&quota_release_work);
> +
>  	spin_lock(&dq_list_lock);
>  	list_for_each_entry_safe(dquot, tmp, &inuse_list, dq_inuse) {
>  		if (dquot->dq_sb != sb)
> @@ -577,6 +582,12 @@ static void invalidate_dquots(struct super_block *sb, int type)
>  			continue;
>  		/* Wait for dquot users */
>  		if (atomic_read(&dquot->dq_count)) {
> +			/* dquot in releasing_dquots, flush and retry */
> +			if (!list_empty(&dquot->dq_free)) {
> +				spin_unlock(&dq_list_lock);
> +				goto restart;
> +			}
> +
>  			atomic_inc(&dquot->dq_count);
>  			spin_unlock(&dq_list_lock);
>  			/*
> @@ -760,6 +771,8 @@ dqcache_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
>  	struct dquot *dquot;
>  	unsigned long freed = 0;
>  
> +	flush_delayed_work(&quota_release_work);
> +

I would not flush the work here. Sure, it can make more dquots available
for reclaim but I think it is more important for the shrinker to not wait
on srcu period as shrinker can be called very frequently under memory
pressure.

>  	spin_lock(&dq_list_lock);
>  	while (!list_empty(&free_dquots) && sc->nr_to_scan) {
>  		dquot = list_first_entry(&free_dquots, struct dquot, dq_free);
> @@ -787,6 +800,60 @@ static struct shrinker dqcache_shrinker = {
>  	.seeks = DEFAULT_SEEKS,
>  };
>  
> +/*
> + * Safely release dquot and put reference to dquot.
> + */
> +static void quota_release_workfn(struct work_struct *work)
> +{
> +	struct dquot *dquot;
> +	struct list_head rls_head;
> +
> +	spin_lock(&dq_list_lock);
> +	/* Exchange the list head to avoid livelock. */
> +	list_replace_init(&releasing_dquots, &rls_head);
> +	spin_unlock(&dq_list_lock);
> +
> +restart:
> +	synchronize_srcu(&dquot_srcu);
> +	spin_lock(&dq_list_lock);
> +	while (!list_empty(&rls_head)) {

I think the logic below needs a bit more work. Firstly, I think that
dqget() should removing dquots from releasing_dquots list - basically just
replace the:
	if (!atomic_read(&dquot->dq_count))
		remove_free_dquot(dquot);
with
	/* Dquot on releasing_dquots list? Drop ref kept by that list. */
	if (atomic_read(&dquot->dq_count) == 1 && !list_empty(&dquot->dq_free))
		atomic_dec(&dquot->dq_count);
	remove_free_dquot(dquot);
	atomic_inc(&dquot->dq_count);

That way we are sure that while we are holding dq_list_lock, all dquots on
rls_head list have dq_count == 1.

> +		dquot = list_first_entry(&rls_head, struct dquot, dq_free);
> +		if (dquot_dirty(dquot)) {
> +			spin_unlock(&dq_list_lock);
> +			/* Commit dquot before releasing */
> +			dquot_write_dquot(dquot);
> +			goto restart;
> +		}
> +		/* Always clear DQ_ACTIVE_B, unless racing with dqget() */
> +		if (dquot_active(dquot)) {
> +			spin_unlock(&dq_list_lock);
> +			dquot->dq_sb->dq_op->release_dquot(dquot);

I'd just go to restart here to make the logic simple. Forward progress is
guaranteed anyway and it isn't really much less efficient.

> +			spin_lock(&dq_list_lock);
> +		}
> +		/*
> +		 * During the execution of dquot_release() outside the
> +		 * dq_list_lock, another process may have completed
> +		 * dqget()/dqput()/mark_dirty().
> +		 */
> +		if (atomic_read(&dquot->dq_count) == 1 &&
> +		    (dquot_active(dquot) || dquot_dirty(dquot))) {
> +			spin_unlock(&dq_list_lock);
> +			goto restart;
> +		}

This can be dropped then...

> +		/*
> +		 * Now it is safe to remove this dquot from releasing_dquots
> +		 * and reduce its reference count.
> +		 */
> +		remove_free_dquot(dquot);
> +		atomic_dec(&dquot->dq_count);
> +
> +		/* We may be racing with some other dqget(). */
> +		if (!atomic_read(&dquot->dq_count))

This condition can also be dropped then.

> +			put_dquot_last(dquot);
> +	}
> +	spin_unlock(&dq_list_lock);
> +}
> +

The rest looks good.

								Honza
Baokun Li June 29, 2023, 11:47 a.m. UTC | #2
On 2023/6/29 18:59, Jan Kara wrote:
> On Wed 28-06-23 21:21:53, Baokun Li wrote:
>> @@ -760,6 +771,8 @@ dqcache_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
>>   	struct dquot *dquot;
>>   	unsigned long freed = 0;
>>   
>> +	flush_delayed_work(&quota_release_work);
>> +
> I would not flush the work here. Sure, it can make more dquots available
> for reclaim but I think it is more important for the shrinker to not wait
> on srcu period as shrinker can be called very frequently under memory
> pressure.
This is because I want to use remove_free_dquot() directly, and if I 
don't do
flush here anymore, then DQST_FREE_DQUOTS will not be accurate.
Since that's the case, I'll remove the flush here and add a determination
to remove_free_dquot() whether to increase DQST_FREE_DQUOTS.
>>   	spin_lock(&dq_list_lock);
>>   	while (!list_empty(&free_dquots) && sc->nr_to_scan) {
>>   		dquot = list_first_entry(&free_dquots, struct dquot, dq_free);
>> @@ -787,6 +800,60 @@ static struct shrinker dqcache_shrinker = {
>>   	.seeks = DEFAULT_SEEKS,
>>   };
>>   
>> +/*
>> + * Safely release dquot and put reference to dquot.
>> + */
>> +static void quota_release_workfn(struct work_struct *work)
>> +{
>> +	struct dquot *dquot;
>> +	struct list_head rls_head;
>> +
>> +	spin_lock(&dq_list_lock);
>> +	/* Exchange the list head to avoid livelock. */
>> +	list_replace_init(&releasing_dquots, &rls_head);
>> +	spin_unlock(&dq_list_lock);
>> +
>> +restart:
>> +	synchronize_srcu(&dquot_srcu);
>> +	spin_lock(&dq_list_lock);
>> +	while (!list_empty(&rls_head)) {
> I think the logic below needs a bit more work. Firstly, I think that
> dqget() should removing dquots from releasing_dquots list - basically just
> replace the:
> 	if (!atomic_read(&dquot->dq_count))
> 		remove_free_dquot(dquot);
> with
> 	/* Dquot on releasing_dquots list? Drop ref kept by that list. */
> 	if (atomic_read(&dquot->dq_count) == 1 && !list_empty(&dquot->dq_free))
> 		atomic_dec(&dquot->dq_count);
> 	remove_free_dquot(dquot);
> 	atomic_inc(&dquot->dq_count);
>
> That way we are sure that while we are holding dq_list_lock, all dquots on
> rls_head list have dq_count == 1.
I wrote it this way at first, but that would have been problematic, so I 
ended up
dropping the dq_count == 1 constraint for dquots on releasing_dquots.
Like the following, we will get a bad dquot directly:

quota_release_workfn
  spin_lock(&dq_list_lock)
  dquot = list_first_entry(&rls_head, struct dquot, dq_free)
  spin_unlock(&dq_list_lock)
  dquot->dq_sb->dq_op->release_dquot(dquot)
  release_dquot
        dqget
         atomic_dec(&dquot->dq_count)
         remove_free_dquot(dquot)
         atomic_inc(&dquot->dq_count)
         spin_unlock(&dq_list_lock)
         wait_on_dquot(dquot)
         if (!test_bit(DQ_ACTIVE_B, &dquot->dq_flags))
         // still active
  mutex_lock(&dquot->dq_lock)
  dquot_is_busy(dquot)
   atomic_read(&dquot->dq_count) > 1
  clear_bit(DQ_ACTIVE_B, &dquot->dq_flags)
  mutex_unlock(&dquot->dq_lock)

Removing dquot from releasing_dquots and its reduced reference count
will cause dquot_is_busy() in dquot_release to fail. wait_on_dquot(dquot)
in dqget would have no effect. This is also the reason why I did not restart
at dquot_active. Adding dquot to releasing_dquots only in dqput() and
removing dquot from releasing_dquots only in quota_release_workfn() is
a simple and effective way to ensure consistency.


>> +		dquot = list_first_entry(&rls_head, struct dquot, dq_free);
>> +		if (dquot_dirty(dquot)) {
>> +			spin_unlock(&dq_list_lock);
>> +			/* Commit dquot before releasing */
>> +			dquot_write_dquot(dquot);
>> +			goto restart;
>> +		}
>> +		/* Always clear DQ_ACTIVE_B, unless racing with dqget() */
>> +		if (dquot_active(dquot)) {
>> +			spin_unlock(&dq_list_lock);
>> +			dquot->dq_sb->dq_op->release_dquot(dquot);
> I'd just go to restart here to make the logic simple. Forward progress is
> guaranteed anyway and it isn't really much less efficient.
>
>
> The rest looks good.
>
> 								Honza
Thanks!
Jan Kara June 29, 2023, 2:33 p.m. UTC | #3
On Thu 29-06-23 19:47:08, Baokun Li wrote:
> On 2023/6/29 18:59, Jan Kara wrote:
> > On Wed 28-06-23 21:21:53, Baokun Li wrote:
> > > @@ -760,6 +771,8 @@ dqcache_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
> > >   	struct dquot *dquot;
> > >   	unsigned long freed = 0;
> > > +	flush_delayed_work(&quota_release_work);
> > > +
> > I would not flush the work here. Sure, it can make more dquots available
> > for reclaim but I think it is more important for the shrinker to not wait
> > on srcu period as shrinker can be called very frequently under memory
> > pressure.
> This is because I want to use remove_free_dquot() directly, and if I don't
> do
> flush here anymore, then DQST_FREE_DQUOTS will not be accurate.
> Since that's the case, I'll remove the flush here and add a determination
> to remove_free_dquot() whether to increase DQST_FREE_DQUOTS.

OK.

> > >   	spin_lock(&dq_list_lock);
> > >   	while (!list_empty(&free_dquots) && sc->nr_to_scan) {
> > >   		dquot = list_first_entry(&free_dquots, struct dquot, dq_free);
> > > @@ -787,6 +800,60 @@ static struct shrinker dqcache_shrinker = {
> > >   	.seeks = DEFAULT_SEEKS,
> > >   };
> > > +/*
> > > + * Safely release dquot and put reference to dquot.
> > > + */
> > > +static void quota_release_workfn(struct work_struct *work)
> > > +{
> > > +	struct dquot *dquot;
> > > +	struct list_head rls_head;
> > > +
> > > +	spin_lock(&dq_list_lock);
> > > +	/* Exchange the list head to avoid livelock. */
> > > +	list_replace_init(&releasing_dquots, &rls_head);
> > > +	spin_unlock(&dq_list_lock);
> > > +
> > > +restart:
> > > +	synchronize_srcu(&dquot_srcu);
> > > +	spin_lock(&dq_list_lock);
> > > +	while (!list_empty(&rls_head)) {
> > I think the logic below needs a bit more work. Firstly, I think that
> > dqget() should removing dquots from releasing_dquots list - basically just
> > replace the:
> > 	if (!atomic_read(&dquot->dq_count))
> > 		remove_free_dquot(dquot);
> > with
> > 	/* Dquot on releasing_dquots list? Drop ref kept by that list. */
> > 	if (atomic_read(&dquot->dq_count) == 1 && !list_empty(&dquot->dq_free))
> > 		atomic_dec(&dquot->dq_count);
> > 	remove_free_dquot(dquot);
> > 	atomic_inc(&dquot->dq_count);
> > 
> > That way we are sure that while we are holding dq_list_lock, all dquots on
> > rls_head list have dq_count == 1.
> I wrote it this way at first, but that would have been problematic, so I
> ended up dropping the dq_count == 1 constraint for dquots on
> releasing_dquots.  Like the following, we will get a bad dquot directly:
> 
> quota_release_workfn
>  spin_lock(&dq_list_lock)
>  dquot = list_first_entry(&rls_head, struct dquot, dq_free)
>  spin_unlock(&dq_list_lock)
>  dquot->dq_sb->dq_op->release_dquot(dquot)
>  release_dquot
>        dqget
>         atomic_dec(&dquot->dq_count)
>         remove_free_dquot(dquot)
>         atomic_inc(&dquot->dq_count)
>         spin_unlock(&dq_list_lock)
>         wait_on_dquot(dquot)
>         if (!test_bit(DQ_ACTIVE_B, &dquot->dq_flags))
>         // still active
>  mutex_lock(&dquot->dq_lock)
>  dquot_is_busy(dquot)
>   atomic_read(&dquot->dq_count) > 1
>  clear_bit(DQ_ACTIVE_B, &dquot->dq_flags)
>  mutex_unlock(&dquot->dq_lock)
> 
> Removing dquot from releasing_dquots and its reduced reference count
> will cause dquot_is_busy() in dquot_release to fail. wait_on_dquot(dquot)
> in dqget would have no effect. This is also the reason why I did not restart
> at dquot_active. Adding dquot to releasing_dquots only in dqput() and
> removing dquot from releasing_dquots only in quota_release_workfn() is
> a simple and effective way to ensure consistency.

Indeed, that's a good point. Still cannot we simplify the loop like:

	while (!list_empty(&rls_head)) {
		dquot = list_first_entry(&rls_head, struct dquot, dq_free);
		/* Dquot got used again? */
		if (atomic_read(&dquot->dq_count) > 1) {
			atomic_dec(&dquot->dq_count);
			remove_free_dquot(dquot);
			continue;
		}
		if (dquot_dirty(dquot)) {
			keep what you had
		}
		if (dquot_active(dquot)) {
			spin_unlock(&dq_list_lock);
			dquot->dq_sb->dq_op->release_dquot(dquot);
			goto restart;
		}
		/* Dquot is inactive and clean, we can move it to free list */
		atomic_dec(&dquot->dq_count);
		remove_free_dquot(dquot);
		put_dquot_last(dquot);
	}

What do you think?
								Honza
Baokun Li June 30, 2023, 7:45 a.m. UTC | #4
On 2023/6/29 22:33, Jan Kara wrote:
> On Thu 29-06-23 19:47:08, Baokun Li wrote:
>> On 2023/6/29 18:59, Jan Kara wrote:
>>> On Wed 28-06-23 21:21:53, Baokun Li wrote:
>>>> @@ -760,6 +771,8 @@ dqcache_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
>>>>    	struct dquot *dquot;
>>>>    	unsigned long freed = 0;
>>>> +	flush_delayed_work(&quota_release_work);
>>>> +
>>> I would not flush the work here. Sure, it can make more dquots available
>>> for reclaim but I think it is more important for the shrinker to not wait
>>> on srcu period as shrinker can be called very frequently under memory
>>> pressure.
>> This is because I want to use remove_free_dquot() directly, and if I don't
>> do
>> flush here anymore, then DQST_FREE_DQUOTS will not be accurate.
>> Since that's the case, I'll remove the flush here and add a determination
>> to remove_free_dquot() whether to increase DQST_FREE_DQUOTS.
> OK.
>
>>>>    	spin_lock(&dq_list_lock);
>>>>    	while (!list_empty(&free_dquots) && sc->nr_to_scan) {
>>>>    		dquot = list_first_entry(&free_dquots, struct dquot, dq_free);
>>>> @@ -787,6 +800,60 @@ static struct shrinker dqcache_shrinker = {
>>>>    	.seeks = DEFAULT_SEEKS,
>>>>    };
>>>> +/*
>>>> + * Safely release dquot and put reference to dquot.
>>>> + */
>>>> +static void quota_release_workfn(struct work_struct *work)
>>>> +{
>>>> +	struct dquot *dquot;
>>>> +	struct list_head rls_head;
>>>> +
>>>> +	spin_lock(&dq_list_lock);
>>>> +	/* Exchange the list head to avoid livelock. */
>>>> +	list_replace_init(&releasing_dquots, &rls_head);
>>>> +	spin_unlock(&dq_list_lock);
>>>> +
>>>> +restart:
>>>> +	synchronize_srcu(&dquot_srcu);
>>>> +	spin_lock(&dq_list_lock);
>>>> +	while (!list_empty(&rls_head)) {
>>> I think the logic below needs a bit more work. Firstly, I think that
>>> dqget() should removing dquots from releasing_dquots list - basically just
>>> replace the:
>>> 	if (!atomic_read(&dquot->dq_count))
>>> 		remove_free_dquot(dquot);
>>> with
>>> 	/* Dquot on releasing_dquots list? Drop ref kept by that list. */
>>> 	if (atomic_read(&dquot->dq_count) == 1 && !list_empty(&dquot->dq_free))
>>> 		atomic_dec(&dquot->dq_count);
>>> 	remove_free_dquot(dquot);
>>> 	atomic_inc(&dquot->dq_count);
>>>
>>> That way we are sure that while we are holding dq_list_lock, all dquots on
>>> rls_head list have dq_count == 1.
>> I wrote it this way at first, but that would have been problematic, so I
>> ended up dropping the dq_count == 1 constraint for dquots on
>> releasing_dquots.  Like the following, we will get a bad dquot directly:
>>
>> quota_release_workfn
>>   spin_lock(&dq_list_lock)
>>   dquot = list_first_entry(&rls_head, struct dquot, dq_free)
>>   spin_unlock(&dq_list_lock)
>>   dquot->dq_sb->dq_op->release_dquot(dquot)
>>   release_dquot
>>         dqget
>>          atomic_dec(&dquot->dq_count)
>>          remove_free_dquot(dquot)
>>          atomic_inc(&dquot->dq_count)
>>          spin_unlock(&dq_list_lock)
>>          wait_on_dquot(dquot)
>>          if (!test_bit(DQ_ACTIVE_B, &dquot->dq_flags))
>>          // still active
>>   mutex_lock(&dquot->dq_lock)
>>   dquot_is_busy(dquot)
>>    atomic_read(&dquot->dq_count) > 1
>>   clear_bit(DQ_ACTIVE_B, &dquot->dq_flags)
>>   mutex_unlock(&dquot->dq_lock)
>>
>> Removing dquot from releasing_dquots and its reduced reference count
>> will cause dquot_is_busy() in dquot_release to fail. wait_on_dquot(dquot)
>> in dqget would have no effect. This is also the reason why I did not restart
>> at dquot_active. Adding dquot to releasing_dquots only in dqput() and
>> removing dquot from releasing_dquots only in quota_release_workfn() is
>> a simple and effective way to ensure consistency.
> Indeed, that's a good point. Still cannot we simplify the loop like:
>
> 	while (!list_empty(&rls_head)) {
> 		dquot = list_first_entry(&rls_head, struct dquot, dq_free);
> 		/* Dquot got used again? */
> 		if (atomic_read(&dquot->dq_count) > 1) {
> 			atomic_dec(&dquot->dq_count);
> 			remove_free_dquot(dquot);
> 			continue;
> 		}
> 		if (dquot_dirty(dquot)) {
> 			keep what you had
> 		}
> 		if (dquot_active(dquot)) {
> 			spin_unlock(&dq_list_lock);
> 			dquot->dq_sb->dq_op->release_dquot(dquot);
> 			goto restart;
> 		}
> 		/* Dquot is inactive and clean, we can move it to free list */
> 		atomic_dec(&dquot->dq_count);
> 		remove_free_dquot(dquot);
> 		put_dquot_last(dquot);
> 	}
>
> What do you think?
> 								Honza
This looks great, and the code looks much cleaner, and I'll send out the
next version later containing your suggested changes!

Thank you so much for your patient review!
diff mbox series

Patch

diff --git a/fs/quota/dquot.c b/fs/quota/dquot.c
index 09787e4f6a00..e8e702ac64e5 100644
--- a/fs/quota/dquot.c
+++ b/fs/quota/dquot.c
@@ -270,6 +270,9 @@  static qsize_t inode_get_rsv_space(struct inode *inode);
 static qsize_t __inode_get_rsv_space(struct inode *inode);
 static int __dquot_initialize(struct inode *inode, int type);
 
+static void quota_release_workfn(struct work_struct *work);
+static DECLARE_DELAYED_WORK(quota_release_work, quota_release_workfn);
+
 static inline unsigned int
 hashfn(const struct super_block *sb, struct kqid qid)
 {
@@ -569,6 +572,8 @@  static void invalidate_dquots(struct super_block *sb, int type)
 	struct dquot *dquot, *tmp;
 
 restart:
+	flush_delayed_work(&quota_release_work);
+
 	spin_lock(&dq_list_lock);
 	list_for_each_entry_safe(dquot, tmp, &inuse_list, dq_inuse) {
 		if (dquot->dq_sb != sb)
@@ -577,6 +582,12 @@  static void invalidate_dquots(struct super_block *sb, int type)
 			continue;
 		/* Wait for dquot users */
 		if (atomic_read(&dquot->dq_count)) {
+			/* dquot in releasing_dquots, flush and retry */
+			if (!list_empty(&dquot->dq_free)) {
+				spin_unlock(&dq_list_lock);
+				goto restart;
+			}
+
 			atomic_inc(&dquot->dq_count);
 			spin_unlock(&dq_list_lock);
 			/*
@@ -760,6 +771,8 @@  dqcache_shrink_scan(struct shrinker *shrink, struct shrink_control *sc)
 	struct dquot *dquot;
 	unsigned long freed = 0;
 
+	flush_delayed_work(&quota_release_work);
+
 	spin_lock(&dq_list_lock);
 	while (!list_empty(&free_dquots) && sc->nr_to_scan) {
 		dquot = list_first_entry(&free_dquots, struct dquot, dq_free);
@@ -787,6 +800,60 @@  static struct shrinker dqcache_shrinker = {
 	.seeks = DEFAULT_SEEKS,
 };
 
+/*
+ * Safely release dquot and put reference to dquot.
+ */
+static void quota_release_workfn(struct work_struct *work)
+{
+	struct dquot *dquot;
+	struct list_head rls_head;
+
+	spin_lock(&dq_list_lock);
+	/* Exchange the list head to avoid livelock. */
+	list_replace_init(&releasing_dquots, &rls_head);
+	spin_unlock(&dq_list_lock);
+
+restart:
+	synchronize_srcu(&dquot_srcu);
+	spin_lock(&dq_list_lock);
+	while (!list_empty(&rls_head)) {
+		dquot = list_first_entry(&rls_head, struct dquot, dq_free);
+		if (dquot_dirty(dquot)) {
+			spin_unlock(&dq_list_lock);
+			/* Commit dquot before releasing */
+			dquot_write_dquot(dquot);
+			goto restart;
+		}
+		/* Always clear DQ_ACTIVE_B, unless racing with dqget() */
+		if (dquot_active(dquot)) {
+			spin_unlock(&dq_list_lock);
+			dquot->dq_sb->dq_op->release_dquot(dquot);
+			spin_lock(&dq_list_lock);
+		}
+		/*
+		 * During the execution of dquot_release() outside the
+		 * dq_list_lock, another process may have completed
+		 * dqget()/dqput()/mark_dirty().
+		 */
+		if (atomic_read(&dquot->dq_count) == 1 &&
+		    (dquot_active(dquot) || dquot_dirty(dquot))) {
+			spin_unlock(&dq_list_lock);
+			goto restart;
+		}
+		/*
+		 * Now it is safe to remove this dquot from releasing_dquots
+		 * and reduce its reference count.
+		 */
+		remove_free_dquot(dquot);
+		atomic_dec(&dquot->dq_count);
+
+		/* We may be racing with some other dqget(). */
+		if (!atomic_read(&dquot->dq_count))
+			put_dquot_last(dquot);
+	}
+	spin_unlock(&dq_list_lock);
+}
+
 /*
  * Put reference to dquot
  */
@@ -803,7 +870,7 @@  void dqput(struct dquot *dquot)
 	}
 #endif
 	dqstats_inc(DQST_DROPS);
-we_slept:
+
 	spin_lock(&dq_list_lock);
 	if (atomic_read(&dquot->dq_count) > 1) {
 		/* We have more than one user... nothing to do */
@@ -815,25 +882,15 @@  void dqput(struct dquot *dquot)
 		spin_unlock(&dq_list_lock);
 		return;
 	}
+
 	/* Need to release dquot? */
-	if (dquot_dirty(dquot)) {
-		spin_unlock(&dq_list_lock);
-		/* Commit dquot before releasing */
-		dquot_write_dquot(dquot);
-		goto we_slept;
-	}
-	if (dquot_active(dquot)) {
-		spin_unlock(&dq_list_lock);
-		dquot->dq_sb->dq_op->release_dquot(dquot);
-		goto we_slept;
-	}
-	atomic_dec(&dquot->dq_count);
 #ifdef CONFIG_QUOTA_DEBUG
 	/* sanity check */
 	BUG_ON(!list_empty(&dquot->dq_free));
 #endif
-	put_dquot_last(dquot);
+	put_releasing_dquots(dquot);
 	spin_unlock(&dq_list_lock);
+	queue_delayed_work(system_unbound_wq, &quota_release_work, 1);
 }
 EXPORT_SYMBOL(dqput);