diff mbox

[16/23] pnfs: support for non-rpc layout drivers

Message ID 1306083123-11061-1-git-send-email-bharrosh@panasas.com (mailing list archive)
State New, archived
Headers show

Commit Message

Boaz Harrosh May 22, 2011, 4:52 p.m. UTC
From: Benny Halevy <bhalevy@panasas.com>

Non-rpc layout driver such as for objects and blocks
implement their own I/O path and error handling logic.
Therefore bypass NFS-based error handling for these layout drivers.

[fix lseg ref-count bugs, and null de-refs]
Signed-off-by: Boaz Harrosh <bharrosh@panasas.com>
[get rid of PNFS_USE_RPC_CODE]
[get rid of __nfs4_write_done_cb]
[revert useless change in nfs4_write_done_cb]
Signed-off-by: Benny Halevy <bhalevy@panasas.com>
---
 fs/nfs/internal.h       |    1 +
 fs/nfs/nfs4proc.c       |   13 +++++++++--
 fs/nfs/pnfs.c           |   52 ++++++++++++++++++++++++++++++++++++++++++++++-
 fs/nfs/pnfs.h           |    2 +
 include/linux/nfs_xdr.h |    2 +
 5 files changed, 66 insertions(+), 4 deletions(-)

Comments

Benny Halevy May 22, 2011, 7:40 p.m. UTC | #1
On 2011-05-22 19:52, Boaz Harrosh wrote:
> From: Benny Halevy <bhalevy@panasas.com>
> 
> Non-rpc layout driver such as for objects and blocks
> implement their own I/O path and error handling logic.
> Therefore bypass NFS-based error handling for these layout drivers.
> 
> [fix lseg ref-count bugs, and null de-refs]
> Signed-off-by: Boaz Harrosh <bharrosh@panasas.com>
> [get rid of PNFS_USE_RPC_CODE]
> [get rid of __nfs4_write_done_cb]
> [revert useless change in nfs4_write_done_cb]
> Signed-off-by: Benny Halevy <bhalevy@panasas.com>
> ---
>  fs/nfs/internal.h       |    1 +
>  fs/nfs/nfs4proc.c       |   13 +++++++++--
>  fs/nfs/pnfs.c           |   52 ++++++++++++++++++++++++++++++++++++++++++++++-
>  fs/nfs/pnfs.h           |    2 +
>  include/linux/nfs_xdr.h |    2 +
>  5 files changed, 66 insertions(+), 4 deletions(-)
> 
> diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
> index ce118ce..bcf0f0f 100644
> --- a/fs/nfs/internal.h
> +++ b/fs/nfs/internal.h
> @@ -310,6 +310,7 @@ extern int nfs_migrate_page(struct address_space *,
>  #endif
>  
>  /* nfs4proc.c */
> +extern void __nfs4_read_done_cb(struct nfs_read_data *);
>  extern void nfs4_reset_read(struct rpc_task *task, struct nfs_read_data *data);
>  extern int nfs4_init_client(struct nfs_client *clp,
wr>  			    const struct rpc_timeout *timeparms,
> diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
> index 69c0f3c..61f30b2 100644
> --- a/fs/nfs/nfs4proc.c
> +++ b/fs/nfs/nfs4proc.c
> @@ -3174,6 +3174,11 @@ static int nfs4_proc_pathconf(struct nfs_server *server, struct nfs_fh *fhandle,
>  	return err;
>  }
>  
> +void __nfs4_read_done_cb(struct nfs_read_data *data)
> +{
> +	nfs_invalidate_atime(data->inode);
> +}
> +
>  static int nfs4_read_done_cb(struct rpc_task *task, struct nfs_read_data *data)
>  {
>  	struct nfs_server *server = NFS_SERVER(data->inode);
> @@ -3183,7 +3188,7 @@ static int nfs4_read_done_cb(struct rpc_task *task, struct nfs_read_data *data)
>  		return -EAGAIN;
>  	}
>  
> -	nfs_invalidate_atime(data->inode);
> +	__nfs4_read_done_cb(data);
>  	if (task->tk_status > 0)
>  		renew_lease(server, data->timestamp);
>  	return 0;
> @@ -3197,7 +3202,8 @@ static int nfs4_read_done(struct rpc_task *task, struct nfs_read_data *data)
>  	if (!nfs4_sequence_done(task, &data->res.seq_res))
>  		return -EAGAIN;
>  
> -	return data->read_done_cb(task, data);
> +	return data->read_done_cb ? data->read_done_cb(task, data) :
> +				    nfs4_read_done_cb(task, data);
>  }
>  
>  static void nfs4_proc_read_setup(struct nfs_read_data *data, struct rpc_message *msg)
> @@ -3242,7 +3248,8 @@ static int nfs4_write_done(struct rpc_task *task, struct nfs_write_data *data)
>  {
>  	if (!nfs4_sequence_done(task, &data->res.seq_res))
>  		return -EAGAIN;
> -	return data->write_done_cb(task, data);
> +	return data->write_done_cb ? data->write_done_cb(task, data) :
> +		nfs4_write_done_cb(task, data);
>  }
>  
>  /* Reset the the nfs_write_data to send the write to the MDS. */
> diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
> index 7f283b2..d4f8ba8 100644
> --- a/fs/nfs/pnfs.c
> +++ b/fs/nfs/pnfs.c
> @@ -254,7 +254,7 @@ put_lseg_common(struct pnfs_layout_segment *lseg)
>  {
>  	struct inode *inode = lseg->pls_layout->plh_inode;
>  
> -	BUG_ON(test_bit(NFS_LSEG_VALID, &lseg->pls_flags));
> +	WARN_ON(test_bit(NFS_LSEG_VALID, &lseg->pls_flags));
>  	list_del_init(&lseg->pls_list);
>  	if (list_empty(&lseg->pls_layout->plh_segs)) {
>  		set_bit(NFS_LAYOUT_DESTROYED, &lseg->pls_layout->plh_flags);
> @@ -1059,6 +1059,31 @@ pnfs_pageio_init_write(struct nfs_pageio_descriptor *pgio, struct inode *inode)
>  	pgio->pg_test = (ld && ld->pg_test) ? pnfs_write_pg_test : NULL;
>  }
>  
> +/*
> + * Called by non rpc-based layout drivers
> + */
> +int
> +pnfs_ld_write_done(struct nfs_write_data *data)
> +{
> +	int status;
> +


> +	if (!data->pnfs_error) {
> +		pnfs_set_layoutcommit(data);

We need at least to set data->task.tk_status to 0

> +		data->mds_ops->rpc_call_done(&data->task, data);
> +		data->mds_ops->rpc_release(data);

Where's the put_lseg you had in PATCH 10/13?

Benny

> +		return 0;
> +	}
> +
> +	put_lseg(data->lseg);
> +	data->lseg = NULL;
> +	dprintk("%s: pnfs_error=%d, retry via MDS\n", __func__,
> +		data->pnfs_error);
> +	status = nfs_initiate_write(data, NFS_CLIENT(data->inode),
> +				    data->mds_ops, NFS_FILE_SYNC);
> +	return status ? : -EAGAIN;
> +}
> +EXPORT_SYMBOL_GPL(pnfs_ld_write_done);
> +
>  enum pnfs_try_status
>  pnfs_try_to_write_data(struct nfs_write_data *wdata,
>  			const struct rpc_call_ops *call_ops, int how)
> @@ -1084,6 +1109,31 @@ pnfs_try_to_write_data(struct nfs_write_data *wdata,
>  }
>  
>  /*
> + * Called by non rpc-based layout drivers
> + */
> +int
> +pnfs_ld_read_done(struct nfs_read_data *data)
> +{
> +	int status;
> +
> +	if (!data->pnfs_error) {
> +		__nfs4_read_done_cb(data);
> +		data->mds_ops->rpc_call_done(&data->task, data);
> +		data->mds_ops->rpc_release(data);
> +		return 0;
> +	}
> +
> +	put_lseg(data->lseg);
> +	data->lseg = NULL;
> +	dprintk("%s: pnfs_error=%d, retry via MDS\n", __func__,
> +		data->pnfs_error);
> +	status = nfs_initiate_read(data, NFS_CLIENT(data->inode),
> +				   data->mds_ops);
> +	return status ? : -EAGAIN;
> +}
> +EXPORT_SYMBOL_GPL(pnfs_ld_read_done);
> +
> +/*
>   * Call the appropriate parallel I/O subsystem read function.
>   */
>  enum pnfs_try_status
> diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
> index e24c7fb..2f8776b 100644
> --- a/fs/nfs/pnfs.h
> +++ b/fs/nfs/pnfs.h
> @@ -166,6 +166,8 @@ void pnfs_roc_set_barrier(struct inode *ino, u32 barrier);
>  bool pnfs_roc_drain(struct inode *ino, u32 *barrier);
>  void pnfs_set_layoutcommit(struct nfs_write_data *wdata);
>  int pnfs_layoutcommit_inode(struct inode *inode, bool sync);
> +int pnfs_ld_write_done(struct nfs_write_data *);
> +int pnfs_ld_read_done(struct nfs_read_data *);
>  
>  static inline int lo_fail_bit(u32 iomode)
>  {
> diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
> index 890dce2..39c1e1b 100644
> --- a/include/linux/nfs_xdr.h
> +++ b/include/linux/nfs_xdr.h
> @@ -1086,6 +1086,7 @@ struct nfs_read_data {
>  	const struct rpc_call_ops *mds_ops;
>  	int (*read_done_cb) (struct rpc_task *task, struct nfs_read_data *data);
>  	__u64			mds_offset;
> +	int			pnfs_error;
>  	struct page		*page_array[NFS_PAGEVEC_SIZE];
>  };
>  
> @@ -1111,6 +1112,7 @@ struct nfs_write_data {
>  	unsigned long		timestamp;	/* For lease renewal */
>  #endif
>  	__u64			mds_offset;	/* Filelayout dense stripe */
> +	int			pnfs_error;
>  	struct page		*page_array[NFS_PAGEVEC_SIZE];
>  };
>  

--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Benny Halevy May 22, 2011, 11:25 p.m. UTC | #2
On 2011-05-22 22:40, Benny Halevy wrote:
> On 2011-05-22 19:52, Boaz Harrosh wrote:
>> From: Benny Halevy <bhalevy@panasas.com>
>> +int
>> +pnfs_ld_write_done(struct nfs_write_data *data)
>> +{
>> +	int status;
>> +
> 
> 
>> +	if (!data->pnfs_error) {
>> +		pnfs_set_layoutcommit(data);
> 
> We need at least to set data->task.tk_status to 0
> 
>> +		data->mds_ops->rpc_call_done(&data->task, data);
>> +		data->mds_ops->rpc_release(data);
> 
> Where's the put_lseg you had in PATCH 10/13?

OK, this is done correctly in the nfs_*data_release
functions.

Benny

> 
> Benny
> 
>> +		return 0;
>> +	}
>> +
>> +	put_lseg(data->lseg);
>> +	data->lseg = NULL;
>> +	dprintk("%s: pnfs_error=%d, retry via MDS\n", __func__,
>> +		data->pnfs_error);
>> +	status = nfs_initiate_write(data, NFS_CLIENT(data->inode),
>> +				    data->mds_ops, NFS_FILE_SYNC);
>> +	return status ? : -EAGAIN;
>> +}
>> +EXPORT_SYMBOL_GPL(pnfs_ld_write_done);
>> +
>>  enum pnfs_try_status
>>  pnfs_try_to_write_data(struct nfs_write_data *wdata,
>>  			const struct rpc_call_ops *call_ops, int how)
>> @@ -1084,6 +1109,31 @@ pnfs_try_to_write_data(struct nfs_write_data *wdata,
>>  }
>>  
>>  /*
>> + * Called by non rpc-based layout drivers
>> + */
>> +int
>> +pnfs_ld_read_done(struct nfs_read_data *data)
>> +{
>> +	int status;
>> +
>> +	if (!data->pnfs_error) {
>> +		__nfs4_read_done_cb(data);
>> +		data->mds_ops->rpc_call_done(&data->task, data);
>> +		data->mds_ops->rpc_release(data);
>> +		return 0;
>> +	}
>> +
>> +	put_lseg(data->lseg);
>> +	data->lseg = NULL;
>> +	dprintk("%s: pnfs_error=%d, retry via MDS\n", __func__,
>> +		data->pnfs_error);
>> +	status = nfs_initiate_read(data, NFS_CLIENT(data->inode),
>> +				   data->mds_ops);
>> +	return status ? : -EAGAIN;
>> +}
>> +EXPORT_SYMBOL_GPL(pnfs_ld_read_done);
>> +
>> +/*
>>   * Call the appropriate parallel I/O subsystem read function.
>>   */
>>  enum pnfs_try_status
>> diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
>> index e24c7fb..2f8776b 100644
>> --- a/fs/nfs/pnfs.h
>> +++ b/fs/nfs/pnfs.h
>> @@ -166,6 +166,8 @@ void pnfs_roc_set_barrier(struct inode *ino, u32 barrier);
>>  bool pnfs_roc_drain(struct inode *ino, u32 *barrier);
>>  void pnfs_set_layoutcommit(struct nfs_write_data *wdata);
>>  int pnfs_layoutcommit_inode(struct inode *inode, bool sync);
>> +int pnfs_ld_write_done(struct nfs_write_data *);
>> +int pnfs_ld_read_done(struct nfs_read_data *);
>>  
>>  static inline int lo_fail_bit(u32 iomode)
>>  {
>> diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
>> index 890dce2..39c1e1b 100644
>> --- a/include/linux/nfs_xdr.h
>> +++ b/include/linux/nfs_xdr.h
>> @@ -1086,6 +1086,7 @@ struct nfs_read_data {
>>  	const struct rpc_call_ops *mds_ops;
>>  	int (*read_done_cb) (struct rpc_task *task, struct nfs_read_data *data);
>>  	__u64			mds_offset;
>> +	int			pnfs_error;
>>  	struct page		*page_array[NFS_PAGEVEC_SIZE];
>>  };
>>  
>> @@ -1111,6 +1112,7 @@ struct nfs_write_data {
>>  	unsigned long		timestamp;	/* For lease renewal */
>>  #endif
>>  	__u64			mds_offset;	/* Filelayout dense stripe */
>> +	int			pnfs_error;
>>  	struct page		*page_array[NFS_PAGEVEC_SIZE];
>>  };
>>  
> 
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html

--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Boaz Harrosh May 23, 2011, 4:22 a.m. UTC | #3
On 05/22/2011 10:40 PM, Benny Halevy wrote:
>> +/*
>> + * Called by non rpc-based layout drivers
>> + */
>> +int
>> +pnfs_ld_write_done(struct nfs_write_data *data)
>> +{
>> +	int status;
>> +
> 
> 
>> +	if (!data->pnfs_error) {
>> +		pnfs_set_layoutcommit(data);
> 
> We need at least to set data->task.tk_status to 0

I guess it does not hurt, but we never touched it, is it garbage?

> 
>> +		data->mds_ops->rpc_call_done(&data->task, data);
>> +		data->mds_ops->rpc_release(data);
> 
> Where's the put_lseg you had in PATCH 10/13?
> 
> Benny

That was the bug. Please see my SQUASHME patches I explained
it all there.

> 
>> +		return 0;
>> +	}
>> +
>> +	put_lseg(data->lseg);
>> +	data->lseg = NULL;

I'm not sure it is needed here as well.
Fred! please see this code
We know that the lseg is put in nfs_writedata_release()
Does the below nfs_initiate_write() retakes the ref.
If it does we need the put here. If it does not we
don't need here.

Boaz

>> +	dprintk("%s: pnfs_error=%d, retry via MDS\n", __func__,
>> +		data->pnfs_error);
>> +	status = nfs_initiate_write(data, NFS_CLIENT(data->inode),
>> +				    data->mds_ops, NFS_FILE_SYNC);
>> +	return status ? : -EAGAIN;
>> +}
--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Fred Isaman May 23, 2011, 2:54 p.m. UTC | #4
On Mon, May 23, 2011 at 12:22 AM, Boaz Harrosh <bharrosh@panasas.com> wrote:
> On 05/22/2011 10:40 PM, Benny Halevy wrote:
>>> +/*
>>> + * Called by non rpc-based layout drivers
>>> + */
>>> +int
>>> +pnfs_ld_write_done(struct nfs_write_data *data)
>>> +{
>>> +    int status;
>>> +
>>
>>
>>> +    if (!data->pnfs_error) {
>>> +            pnfs_set_layoutcommit(data);
>>
>> We need at least to set data->task.tk_status to 0
>
> I guess it does not hurt, but we never touched it, is it garbage?
>
>>
>>> +            data->mds_ops->rpc_call_done(&data->task, data);
>>> +            data->mds_ops->rpc_release(data);
>>
>> Where's the put_lseg you had in PATCH 10/13?
>>
>> Benny
>
> That was the bug. Please see my SQUASHME patches I explained
> it all there.
>
>>
>>> +            return 0;
>>> +    }
>>> +
>>> +    put_lseg(data->lseg);
>>> +    data->lseg = NULL;
>
> I'm not sure it is needed here as well.
> Fred! please see this code
> We know that the lseg is put in nfs_writedata_release()
> Does the below nfs_initiate_write() retakes the ref.
> If it does we need the put here. If it does not we
> don't need here.
>

nfs_initiate_read/write() did not take any reference to the lseg.
This was done immediately prior in the nfs_read/write_rpcsetup
functions.

Note however that the commit code takes references in the file driver code.

Fred

> Boaz
>
>>> +    dprintk("%s: pnfs_error=%d, retry via MDS\n", __func__,
>>> +            data->pnfs_error);
>>> +    status = nfs_initiate_write(data, NFS_CLIENT(data->inode),
>>> +                                data->mds_ops, NFS_FILE_SYNC);
>>> +    return status ? : -EAGAIN;
>>> +}
> --
> To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
>
--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Boaz Harrosh May 23, 2011, 5:56 p.m. UTC | #5
On 05/23/2011 05:54 PM, Fred Isaman wrote:
> On Mon, May 23, 2011 at 12:22 AM, Boaz Harrosh <bharrosh@panasas.com> wrote:
>> On 05/22/2011 10:40 PM, Benny Halevy wrote:
>>>> +/*
>>>> + * Called by non rpc-based layout drivers
>>>> + */
>>>> +int
>>>> +pnfs_ld_write_done(struct nfs_write_data *data)
>>>> +{
>>>> +    int status;
>>>> +
>>>
>>>
>>>> +    if (!data->pnfs_error) {
>>>> +            pnfs_set_layoutcommit(data);
>>>
>>> We need at least to set data->task.tk_status to 0
>>
>> I guess it does not hurt, but we never touched it, is it garbage?
>>
>>>
>>>> +            data->mds_ops->rpc_call_done(&data->task, data);
>>>> +            data->mds_ops->rpc_release(data);
>>>
>>> Where's the put_lseg you had in PATCH 10/13?
>>>
>>> Benny
>>
>> That was the bug. Please see my SQUASHME patches I explained
>> it all there.
>>
>>>
>>>> +            return 0;
>>>> +    }
>>>> +
>>>> +    put_lseg(data->lseg);
>>>> +    data->lseg = NULL;
>>
>> I'm not sure it is needed here as well.
>> Fred! please see this code
>> We know that the lseg is put in nfs_writedata_release()
>> Does the below nfs_initiate_write() retakes the ref.
>> If it does we need the put here. If it does not we
>> don't need here.
>>
> 
> nfs_initiate_read/write() did not take any reference to the lseg.
> This was done immediately prior in the nfs_read/write_rpcsetup
> functions.
> 

Thanks, Yes, you are right. I actually tested that and I saw the bug.
The put here in the error case needs to be removed as well.
Benny I'll send a squashme. (I inserted simulated errors to test the error path)

> Note however that the commit code takes references in the file driver code.
> 

We don't do commit in objects yet, and Benny did not implement the
pnfs_ld_commit_done, yet. But we will need it eventually.

I hope that the ref taken by the file driver is balance by the driver
and not by generic code, right? If not, we might need to fix that somewhere.

> Fred
> 

Thanks
>> Boaz
>>
>>>> +    dprintk("%s: pnfs_error=%d, retry via MDS\n", __func__,
>>>> +            data->pnfs_error);
>>>> +    status = nfs_initiate_write(data, NFS_CLIENT(data->inode),
>>>> +                                data->mds_ops, NFS_FILE_SYNC);
>>>> +    return status ? : -EAGAIN;
>>>> +}
--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/fs/nfs/internal.h b/fs/nfs/internal.h
index ce118ce..bcf0f0f 100644
--- a/fs/nfs/internal.h
+++ b/fs/nfs/internal.h
@@ -310,6 +310,7 @@  extern int nfs_migrate_page(struct address_space *,
 #endif
 
 /* nfs4proc.c */
+extern void __nfs4_read_done_cb(struct nfs_read_data *);
 extern void nfs4_reset_read(struct rpc_task *task, struct nfs_read_data *data);
 extern int nfs4_init_client(struct nfs_client *clp,
 			    const struct rpc_timeout *timeparms,
diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 69c0f3c..61f30b2 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -3174,6 +3174,11 @@  static int nfs4_proc_pathconf(struct nfs_server *server, struct nfs_fh *fhandle,
 	return err;
 }
 
+void __nfs4_read_done_cb(struct nfs_read_data *data)
+{
+	nfs_invalidate_atime(data->inode);
+}
+
 static int nfs4_read_done_cb(struct rpc_task *task, struct nfs_read_data *data)
 {
 	struct nfs_server *server = NFS_SERVER(data->inode);
@@ -3183,7 +3188,7 @@  static int nfs4_read_done_cb(struct rpc_task *task, struct nfs_read_data *data)
 		return -EAGAIN;
 	}
 
-	nfs_invalidate_atime(data->inode);
+	__nfs4_read_done_cb(data);
 	if (task->tk_status > 0)
 		renew_lease(server, data->timestamp);
 	return 0;
@@ -3197,7 +3202,8 @@  static int nfs4_read_done(struct rpc_task *task, struct nfs_read_data *data)
 	if (!nfs4_sequence_done(task, &data->res.seq_res))
 		return -EAGAIN;
 
-	return data->read_done_cb(task, data);
+	return data->read_done_cb ? data->read_done_cb(task, data) :
+				    nfs4_read_done_cb(task, data);
 }
 
 static void nfs4_proc_read_setup(struct nfs_read_data *data, struct rpc_message *msg)
@@ -3242,7 +3248,8 @@  static int nfs4_write_done(struct rpc_task *task, struct nfs_write_data *data)
 {
 	if (!nfs4_sequence_done(task, &data->res.seq_res))
 		return -EAGAIN;
-	return data->write_done_cb(task, data);
+	return data->write_done_cb ? data->write_done_cb(task, data) :
+		nfs4_write_done_cb(task, data);
 }
 
 /* Reset the the nfs_write_data to send the write to the MDS. */
diff --git a/fs/nfs/pnfs.c b/fs/nfs/pnfs.c
index 7f283b2..d4f8ba8 100644
--- a/fs/nfs/pnfs.c
+++ b/fs/nfs/pnfs.c
@@ -254,7 +254,7 @@  put_lseg_common(struct pnfs_layout_segment *lseg)
 {
 	struct inode *inode = lseg->pls_layout->plh_inode;
 
-	BUG_ON(test_bit(NFS_LSEG_VALID, &lseg->pls_flags));
+	WARN_ON(test_bit(NFS_LSEG_VALID, &lseg->pls_flags));
 	list_del_init(&lseg->pls_list);
 	if (list_empty(&lseg->pls_layout->plh_segs)) {
 		set_bit(NFS_LAYOUT_DESTROYED, &lseg->pls_layout->plh_flags);
@@ -1059,6 +1059,31 @@  pnfs_pageio_init_write(struct nfs_pageio_descriptor *pgio, struct inode *inode)
 	pgio->pg_test = (ld && ld->pg_test) ? pnfs_write_pg_test : NULL;
 }
 
+/*
+ * Called by non rpc-based layout drivers
+ */
+int
+pnfs_ld_write_done(struct nfs_write_data *data)
+{
+	int status;
+
+	if (!data->pnfs_error) {
+		pnfs_set_layoutcommit(data);
+		data->mds_ops->rpc_call_done(&data->task, data);
+		data->mds_ops->rpc_release(data);
+		return 0;
+	}
+
+	put_lseg(data->lseg);
+	data->lseg = NULL;
+	dprintk("%s: pnfs_error=%d, retry via MDS\n", __func__,
+		data->pnfs_error);
+	status = nfs_initiate_write(data, NFS_CLIENT(data->inode),
+				    data->mds_ops, NFS_FILE_SYNC);
+	return status ? : -EAGAIN;
+}
+EXPORT_SYMBOL_GPL(pnfs_ld_write_done);
+
 enum pnfs_try_status
 pnfs_try_to_write_data(struct nfs_write_data *wdata,
 			const struct rpc_call_ops *call_ops, int how)
@@ -1084,6 +1109,31 @@  pnfs_try_to_write_data(struct nfs_write_data *wdata,
 }
 
 /*
+ * Called by non rpc-based layout drivers
+ */
+int
+pnfs_ld_read_done(struct nfs_read_data *data)
+{
+	int status;
+
+	if (!data->pnfs_error) {
+		__nfs4_read_done_cb(data);
+		data->mds_ops->rpc_call_done(&data->task, data);
+		data->mds_ops->rpc_release(data);
+		return 0;
+	}
+
+	put_lseg(data->lseg);
+	data->lseg = NULL;
+	dprintk("%s: pnfs_error=%d, retry via MDS\n", __func__,
+		data->pnfs_error);
+	status = nfs_initiate_read(data, NFS_CLIENT(data->inode),
+				   data->mds_ops);
+	return status ? : -EAGAIN;
+}
+EXPORT_SYMBOL_GPL(pnfs_ld_read_done);
+
+/*
  * Call the appropriate parallel I/O subsystem read function.
  */
 enum pnfs_try_status
diff --git a/fs/nfs/pnfs.h b/fs/nfs/pnfs.h
index e24c7fb..2f8776b 100644
--- a/fs/nfs/pnfs.h
+++ b/fs/nfs/pnfs.h
@@ -166,6 +166,8 @@  void pnfs_roc_set_barrier(struct inode *ino, u32 barrier);
 bool pnfs_roc_drain(struct inode *ino, u32 *barrier);
 void pnfs_set_layoutcommit(struct nfs_write_data *wdata);
 int pnfs_layoutcommit_inode(struct inode *inode, bool sync);
+int pnfs_ld_write_done(struct nfs_write_data *);
+int pnfs_ld_read_done(struct nfs_read_data *);
 
 static inline int lo_fail_bit(u32 iomode)
 {
diff --git a/include/linux/nfs_xdr.h b/include/linux/nfs_xdr.h
index 890dce2..39c1e1b 100644
--- a/include/linux/nfs_xdr.h
+++ b/include/linux/nfs_xdr.h
@@ -1086,6 +1086,7 @@  struct nfs_read_data {
 	const struct rpc_call_ops *mds_ops;
 	int (*read_done_cb) (struct rpc_task *task, struct nfs_read_data *data);
 	__u64			mds_offset;
+	int			pnfs_error;
 	struct page		*page_array[NFS_PAGEVEC_SIZE];
 };
 
@@ -1111,6 +1112,7 @@  struct nfs_write_data {
 	unsigned long		timestamp;	/* For lease renewal */
 #endif
 	__u64			mds_offset;	/* Filelayout dense stripe */
+	int			pnfs_error;
 	struct page		*page_array[NFS_PAGEVEC_SIZE];
 };