diff mbox

[v4,04/21] xprtrdma: Update rkeys after transport reconnect

Message ID 20140722145133.6010.52757.stgit@manet.1015granger.net (mailing list archive)
State New, archived
Headers show

Commit Message

Chuck Lever July 22, 2014, 2:51 p.m. UTC
Various reports of:

  rpcrdma_qp_async_error_upcall: QP error 3 on device mlx4_0
		ep ffff8800bfd3e848

Ensure that rkeys in already-marshalled RPC/RDMA headers are
refreshed after the QP has been replaced by a reconnect.

BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=249
Suggested-by: Selvin Xavier <Selvin.Xavier@Emulex.Com>
Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Tested-by: Steve Wise <swise@opengridcomputing.com>
Tested-by: Shirley Ma <shirley.ma@oracle.com>
Tested-by: Devesh Sharma <devesh.sharma@emulex.com>
---
 net/sunrpc/xprtrdma/rpc_rdma.c  |   75 ++++++++++++++++++++-------------------
 net/sunrpc/xprtrdma/transport.c |   11 +++---
 net/sunrpc/xprtrdma/xprt_rdma.h |   10 +++++
 3 files changed, 54 insertions(+), 42 deletions(-)


--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

Schumaker, Anna July 22, 2014, 6:40 p.m. UTC | #1
On 07/22/2014 10:51 AM, Chuck Lever wrote:
> Various reports of:
> 
>   rpcrdma_qp_async_error_upcall: QP error 3 on device mlx4_0
> 		ep ffff8800bfd3e848
> 
> Ensure that rkeys in already-marshalled RPC/RDMA headers are
> refreshed after the QP has been replaced by a reconnect.
> 
> BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=249
> Suggested-by: Selvin Xavier <Selvin.Xavier@Emulex.Com>
> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
> Tested-by: Steve Wise <swise@opengridcomputing.com>
> Tested-by: Shirley Ma <shirley.ma@oracle.com>
> Tested-by: Devesh Sharma <devesh.sharma@emulex.com>
> ---
>  net/sunrpc/xprtrdma/rpc_rdma.c  |   75 ++++++++++++++++++++-------------------
>  net/sunrpc/xprtrdma/transport.c |   11 +++---
>  net/sunrpc/xprtrdma/xprt_rdma.h |   10 +++++
>  3 files changed, 54 insertions(+), 42 deletions(-)
> 
> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
> index 693966d..54422f7 100644
> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -53,14 +53,6 @@
>  # define RPCDBG_FACILITY	RPCDBG_TRANS
>  #endif
>  
> -enum rpcrdma_chunktype {
> -	rpcrdma_noch = 0,
> -	rpcrdma_readch,
> -	rpcrdma_areadch,
> -	rpcrdma_writech,
> -	rpcrdma_replych
> -};
> -
>  #ifdef RPC_DEBUG
>  static const char transfertypes[][12] = {
>  	"pure inline",	/* no chunks */
> @@ -286,6 +278,28 @@ out:
>  }
>  
>  /*
> + * Marshal chunks. This routine returns the header length
> + * consumed by marshaling.
> + *
> + * Returns positive RPC/RDMA header size, or negative errno.
> + */
> +
> +ssize_t
> +rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result)
> +{
> +	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
> +	struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base;
> +
> +	if (req->rl_rtype != rpcrdma_noch)
> +		result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
> +					       headerp, req->rl_rtype);
> +	else if (req->rl_wtype != rpcrdma_noch)
> +		result = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
> +					       headerp, req->rl_wtype);
> +	return result;
> +}
> +
> +/*
>   * Copy write data inline.
>   * This function is used for "small" requests. Data which is passed
>   * to RPC via iovecs (or page list) is copied directly into the
> @@ -377,7 +391,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>  	char *base;
>  	size_t rpclen, padlen;
>  	ssize_t hdrlen;
> -	enum rpcrdma_chunktype rtype, wtype;
>  	struct rpcrdma_msg *headerp;
>  
>  	/*
> @@ -415,13 +428,13 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>  	 * into pages; otherwise use reply chunks.
>  	 */
>  	if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
> -		wtype = rpcrdma_noch;
> +		req->rl_wtype = rpcrdma_noch;
>  	else if (rqst->rq_rcv_buf.page_len == 0)
> -		wtype = rpcrdma_replych;
> +		req->rl_wtype = rpcrdma_replych;
>  	else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
> -		wtype = rpcrdma_writech;
> +		req->rl_wtype = rpcrdma_writech;
>  	else
> -		wtype = rpcrdma_replych;
> +		req->rl_wtype = rpcrdma_replych;
>  
>  	/*
>  	 * Chunks needed for arguments?
> @@ -438,16 +451,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>  	 * TBD check NFSv4 setacl
>  	 */
>  	if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
> -		rtype = rpcrdma_noch;
> +		req->rl_rtype = rpcrdma_noch;
>  	else if (rqst->rq_snd_buf.page_len == 0)
> -		rtype = rpcrdma_areadch;
> +		req->rl_rtype = rpcrdma_areadch;
>  	else
> -		rtype = rpcrdma_readch;
> +		req->rl_rtype = rpcrdma_readch;
>  
>  	/* The following simplification is not true forever */
> -	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
> -		wtype = rpcrdma_noch;
> -	if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
> +	if (req->rl_rtype != rpcrdma_noch && req->rl_wtype == rpcrdma_replych)
> +		req->rl_wtype = rpcrdma_noch;
> +	if (req->rl_rtype != rpcrdma_noch && req->rl_wtype != rpcrdma_noch) {
>  		dprintk("RPC:       %s: cannot marshal multiple chunk lists\n",
>  			__func__);

Is this still a simplification, and is it still "not true forever"?

Anna

>  		return -EIO;
> @@ -461,7 +474,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>  	 * When padding is in use and applies to the transfer, insert
>  	 * it and change the message type.
>  	 */
> -	if (rtype == rpcrdma_noch) {
> +	if (req->rl_rtype == rpcrdma_noch) {
>  
>  		padlen = rpcrdma_inline_pullup(rqst,
>  						RPCRDMA_INLINE_PAD_VALUE(rqst));
> @@ -476,7 +489,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>  			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
>  			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
>  			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
> -			if (wtype != rpcrdma_noch) {
> +			if (req->rl_wtype != rpcrdma_noch) {
>  				dprintk("RPC:       %s: invalid chunk list\n",
>  					__func__);
>  				return -EIO;
> @@ -497,30 +510,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>  			 * on receive. Therefore, we request a reply chunk
>  			 * for non-writes wherever feasible and efficient.
>  			 */
> -			if (wtype == rpcrdma_noch)
> -				wtype = rpcrdma_replych;
> +			if (req->rl_wtype == rpcrdma_noch)
> +				req->rl_wtype = rpcrdma_replych;
>  		}
>  	}
>  
> -	/*
> -	 * Marshal chunks. This routine will return the header length
> -	 * consumed by marshaling.
> -	 */
> -	if (rtype != rpcrdma_noch) {
> -		hdrlen = rpcrdma_create_chunks(rqst,
> -					&rqst->rq_snd_buf, headerp, rtype);
> -		wtype = rtype;	/* simplify dprintk */
> -
> -	} else if (wtype != rpcrdma_noch) {
> -		hdrlen = rpcrdma_create_chunks(rqst,
> -					&rqst->rq_rcv_buf, headerp, wtype);
> -	}
> +	hdrlen = rpcrdma_marshal_chunks(rqst, hdrlen);
>  	if (hdrlen < 0)
>  		return hdrlen;
>  
>  	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
>  		" headerp 0x%p base 0x%p lkey 0x%x\n",
> -		__func__, transfertypes[wtype], hdrlen, rpclen, padlen,
> +		__func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen,
>  		headerp, base, req->rl_iov.lkey);
>  
>  	/*
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index 4185102..f6d280b 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -597,13 +597,14 @@ xprt_rdma_send_request(struct rpc_task *task)
>  	struct rpc_xprt *xprt = rqst->rq_xprt;
>  	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
>  	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> -	int rc;
> +	int rc = 0;
>  
> -	if (req->rl_niovs == 0) {
> +	if (req->rl_niovs == 0)
>  		rc = rpcrdma_marshal_req(rqst);
> -		if (rc < 0)
> -			goto failed_marshal;
> -	}
> +	else if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
> +		rc = rpcrdma_marshal_chunks(rqst, 0);
> +	if (rc < 0)
> +		goto failed_marshal;
>  
>  	if (req->rl_reply == NULL) 		/* e.g. reconnection */
>  		rpcrdma_recv_buffer_get(req);
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index f3d86b2..c270e59 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -99,6 +99,14 @@ struct rpcrdma_ep {
>  #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
>  #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
>  
> +enum rpcrdma_chunktype {
> +	rpcrdma_noch = 0,
> +	rpcrdma_readch,
> +	rpcrdma_areadch,
> +	rpcrdma_writech,
> +	rpcrdma_replych
> +};
> +
>  /*
>   * struct rpcrdma_rep -- this structure encapsulates state required to recv
>   * and complete a reply, asychronously. It needs several pieces of
> @@ -192,6 +200,7 @@ struct rpcrdma_req {
>  	unsigned int	rl_niovs;	/* 0, 2 or 4 */
>  	unsigned int	rl_nchunks;	/* non-zero if chunks */
>  	unsigned int	rl_connect_cookie;	/* retry detection */
> +	enum rpcrdma_chunktype	rl_rtype, rl_wtype;
>  	struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
>  	struct rpcrdma_rep	*rl_reply;/* holder for reply buffer */
>  	struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
> @@ -347,6 +356,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
>  /*
>   * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
>   */
> +ssize_t rpcrdma_marshal_chunks(struct rpc_rqst *, ssize_t);
>  int rpcrdma_marshal_req(struct rpc_rqst *);
>  size_t rpcrdma_max_payload(struct rpcrdma_xprt *);
>  
> 

--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Chuck Lever July 24, 2014, 8:03 p.m. UTC | #2
On Jul 22, 2014, at 2:40 PM, Anna Schumaker <Anna.Schumaker@netapp.com> wrote:

> On 07/22/2014 10:51 AM, Chuck Lever wrote:
>> Various reports of:
>> 
>>  rpcrdma_qp_async_error_upcall: QP error 3 on device mlx4_0
>> 		ep ffff8800bfd3e848
>> 
>> Ensure that rkeys in already-marshalled RPC/RDMA headers are
>> refreshed after the QP has been replaced by a reconnect.
>> 
>> BugLink: https://bugzilla.linux-nfs.org/show_bug.cgi?id=249
>> Suggested-by: Selvin Xavier <Selvin.Xavier@Emulex.Com>
>> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
>> Tested-by: Steve Wise <swise@opengridcomputing.com>
>> Tested-by: Shirley Ma <shirley.ma@oracle.com>
>> Tested-by: Devesh Sharma <devesh.sharma@emulex.com>
>> ---
>> net/sunrpc/xprtrdma/rpc_rdma.c  |   75 ++++++++++++++++++++-------------------
>> net/sunrpc/xprtrdma/transport.c |   11 +++---
>> net/sunrpc/xprtrdma/xprt_rdma.h |   10 +++++
>> 3 files changed, 54 insertions(+), 42 deletions(-)
>> 
>> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
>> index 693966d..54422f7 100644
>> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
>> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
>> @@ -53,14 +53,6 @@
>> # define RPCDBG_FACILITY	RPCDBG_TRANS
>> #endif
>> 
>> -enum rpcrdma_chunktype {
>> -	rpcrdma_noch = 0,
>> -	rpcrdma_readch,
>> -	rpcrdma_areadch,
>> -	rpcrdma_writech,
>> -	rpcrdma_replych
>> -};
>> -
>> #ifdef RPC_DEBUG
>> static const char transfertypes[][12] = {
>> 	"pure inline",	/* no chunks */
>> @@ -286,6 +278,28 @@ out:
>> }
>> 
>> /*
>> + * Marshal chunks. This routine returns the header length
>> + * consumed by marshaling.
>> + *
>> + * Returns positive RPC/RDMA header size, or negative errno.
>> + */
>> +
>> +ssize_t
>> +rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result)
>> +{
>> +	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
>> +	struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base;
>> +
>> +	if (req->rl_rtype != rpcrdma_noch)
>> +		result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
>> +					       headerp, req->rl_rtype);
>> +	else if (req->rl_wtype != rpcrdma_noch)
>> +		result = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
>> +					       headerp, req->rl_wtype);
>> +	return result;
>> +}
>> +
>> +/*
>>  * Copy write data inline.
>>  * This function is used for "small" requests. Data which is passed
>>  * to RPC via iovecs (or page list) is copied directly into the
>> @@ -377,7 +391,6 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>> 	char *base;
>> 	size_t rpclen, padlen;
>> 	ssize_t hdrlen;
>> -	enum rpcrdma_chunktype rtype, wtype;
>> 	struct rpcrdma_msg *headerp;
>> 
>> 	/*
>> @@ -415,13 +428,13 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>> 	 * into pages; otherwise use reply chunks.
>> 	 */
>> 	if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
>> -		wtype = rpcrdma_noch;
>> +		req->rl_wtype = rpcrdma_noch;
>> 	else if (rqst->rq_rcv_buf.page_len == 0)
>> -		wtype = rpcrdma_replych;
>> +		req->rl_wtype = rpcrdma_replych;
>> 	else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
>> -		wtype = rpcrdma_writech;
>> +		req->rl_wtype = rpcrdma_writech;
>> 	else
>> -		wtype = rpcrdma_replych;
>> +		req->rl_wtype = rpcrdma_replych;
>> 
>> 	/*
>> 	 * Chunks needed for arguments?
>> @@ -438,16 +451,16 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>> 	 * TBD check NFSv4 setacl
>> 	 */
>> 	if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
>> -		rtype = rpcrdma_noch;
>> +		req->rl_rtype = rpcrdma_noch;
>> 	else if (rqst->rq_snd_buf.page_len == 0)
>> -		rtype = rpcrdma_areadch;
>> +		req->rl_rtype = rpcrdma_areadch;
>> 	else
>> -		rtype = rpcrdma_readch;
>> +		req->rl_rtype = rpcrdma_readch;
>> 
>> 	/* The following simplification is not true forever */
>> -	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
>> -		wtype = rpcrdma_noch;
>> -	if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
>> +	if (req->rl_rtype != rpcrdma_noch && req->rl_wtype == rpcrdma_replych)
>> +		req->rl_wtype = rpcrdma_noch;
>> +	if (req->rl_rtype != rpcrdma_noch && req->rl_wtype != rpcrdma_noch) {
>> 		dprintk("RPC:       %s: cannot marshal multiple chunk lists\n",
>> 			__func__);
> 
> Is this still a simplification,

Yes.

I asked Tom Talpey for some clarification of his shorthand.

Generic RPC/RDMA allows an RPC/RDMA header to contain multiple chunk
lists. A chunk list is a vector of memory locations used during an
RDMA READ or WRITE. Only RPC operations that might carry a large
payload would use chunk lists.

However, NFSv2 and NFSv3 operations (particularly READ and WRITE,
which potentially carry large payloads) never need more than a single
chunk list. This is because there are no NFS operations that ever
move large amounts of data in both directions at once: either the
NFS call is large and the NFS reply is small, or vice versa.

This logic ensures there is no more than one chunk list in each
RPC/RDMA header, since NFS is the only RPC consumer that uses the
kernel’s RPC/RDMA transport and that might require chunk lists.

That simplifies the marshaling logic, but it is no longer generic;
it works specifically for our in-kernel NFS client implementation.

> and is it still "not true forever”?

Yes.

NFSv4 so far has been modeled after NFSv2 and NFSv3, in that compounds
that do reads and writes bear only one READ or WRITE operation (either
of which would use a single chunk list). If someday the NFS client was
modified to perform multiple READ or WRITE operations per compound,
this simplification would no longer be valid for NFSv4, since a
compound could employ more than one chunk list (one for each READ or
WRITE operation in the compound).


> Anna
> 
>> 		return -EIO;
>> @@ -461,7 +474,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>> 	 * When padding is in use and applies to the transfer, insert
>> 	 * it and change the message type.
>> 	 */
>> -	if (rtype == rpcrdma_noch) {
>> +	if (req->rl_rtype == rpcrdma_noch) {
>> 
>> 		padlen = rpcrdma_inline_pullup(rqst,
>> 						RPCRDMA_INLINE_PAD_VALUE(rqst));
>> @@ -476,7 +489,7 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>> 			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
>> 			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
>> 			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
>> -			if (wtype != rpcrdma_noch) {
>> +			if (req->rl_wtype != rpcrdma_noch) {
>> 				dprintk("RPC:       %s: invalid chunk list\n",
>> 					__func__);
>> 				return -EIO;
>> @@ -497,30 +510,18 @@ rpcrdma_marshal_req(struct rpc_rqst *rqst)
>> 			 * on receive. Therefore, we request a reply chunk
>> 			 * for non-writes wherever feasible and efficient.
>> 			 */
>> -			if (wtype == rpcrdma_noch)
>> -				wtype = rpcrdma_replych;
>> +			if (req->rl_wtype == rpcrdma_noch)
>> +				req->rl_wtype = rpcrdma_replych;
>> 		}
>> 	}
>> 
>> -	/*
>> -	 * Marshal chunks. This routine will return the header length
>> -	 * consumed by marshaling.
>> -	 */
>> -	if (rtype != rpcrdma_noch) {
>> -		hdrlen = rpcrdma_create_chunks(rqst,
>> -					&rqst->rq_snd_buf, headerp, rtype);
>> -		wtype = rtype;	/* simplify dprintk */
>> -
>> -	} else if (wtype != rpcrdma_noch) {
>> -		hdrlen = rpcrdma_create_chunks(rqst,
>> -					&rqst->rq_rcv_buf, headerp, wtype);
>> -	}
>> +	hdrlen = rpcrdma_marshal_chunks(rqst, hdrlen);
>> 	if (hdrlen < 0)
>> 		return hdrlen;
>> 
>> 	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
>> 		" headerp 0x%p base 0x%p lkey 0x%x\n",
>> -		__func__, transfertypes[wtype], hdrlen, rpclen, padlen,
>> +		__func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen,
>> 		headerp, base, req->rl_iov.lkey);
>> 
>> 	/*
>> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
>> index 4185102..f6d280b 100644
>> --- a/net/sunrpc/xprtrdma/transport.c
>> +++ b/net/sunrpc/xprtrdma/transport.c
>> @@ -597,13 +597,14 @@ xprt_rdma_send_request(struct rpc_task *task)
>> 	struct rpc_xprt *xprt = rqst->rq_xprt;
>> 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
>> 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
>> -	int rc;
>> +	int rc = 0;
>> 
>> -	if (req->rl_niovs == 0) {
>> +	if (req->rl_niovs == 0)
>> 		rc = rpcrdma_marshal_req(rqst);
>> -		if (rc < 0)
>> -			goto failed_marshal;
>> -	}
>> +	else if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
>> +		rc = rpcrdma_marshal_chunks(rqst, 0);
>> +	if (rc < 0)
>> +		goto failed_marshal;
>> 
>> 	if (req->rl_reply == NULL) 		/* e.g. reconnection */
>> 		rpcrdma_recv_buffer_get(req);
>> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
>> index f3d86b2..c270e59 100644
>> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
>> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
>> @@ -99,6 +99,14 @@ struct rpcrdma_ep {
>> #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
>> #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
>> 
>> +enum rpcrdma_chunktype {
>> +	rpcrdma_noch = 0,
>> +	rpcrdma_readch,
>> +	rpcrdma_areadch,
>> +	rpcrdma_writech,
>> +	rpcrdma_replych
>> +};
>> +
>> /*
>>  * struct rpcrdma_rep -- this structure encapsulates state required to recv
>>  * and complete a reply, asychronously. It needs several pieces of
>> @@ -192,6 +200,7 @@ struct rpcrdma_req {
>> 	unsigned int	rl_niovs;	/* 0, 2 or 4 */
>> 	unsigned int	rl_nchunks;	/* non-zero if chunks */
>> 	unsigned int	rl_connect_cookie;	/* retry detection */
>> +	enum rpcrdma_chunktype	rl_rtype, rl_wtype;
>> 	struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
>> 	struct rpcrdma_rep	*rl_reply;/* holder for reply buffer */
>> 	struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
>> @@ -347,6 +356,7 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *);
>> /*
>>  * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
>>  */
>> +ssize_t rpcrdma_marshal_chunks(struct rpc_rqst *, ssize_t);
>> int rpcrdma_marshal_req(struct rpc_rqst *);
>> size_t rpcrdma_max_payload(struct rpcrdma_xprt *);
>> 
>> 
> 
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html

--
Chuck Lever
chuck[dot]lever[at]oracle[dot]com



--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 693966d..54422f7 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -53,14 +53,6 @@ 
 # define RPCDBG_FACILITY	RPCDBG_TRANS
 #endif
 
-enum rpcrdma_chunktype {
-	rpcrdma_noch = 0,
-	rpcrdma_readch,
-	rpcrdma_areadch,
-	rpcrdma_writech,
-	rpcrdma_replych
-};
-
 #ifdef RPC_DEBUG
 static const char transfertypes[][12] = {
 	"pure inline",	/* no chunks */
@@ -286,6 +278,28 @@  out:
 }
 
 /*
+ * Marshal chunks. This routine returns the header length
+ * consumed by marshaling.
+ *
+ * Returns positive RPC/RDMA header size, or negative errno.
+ */
+
+ssize_t
+rpcrdma_marshal_chunks(struct rpc_rqst *rqst, ssize_t result)
+{
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+	struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)req->rl_base;
+
+	if (req->rl_rtype != rpcrdma_noch)
+		result = rpcrdma_create_chunks(rqst, &rqst->rq_snd_buf,
+					       headerp, req->rl_rtype);
+	else if (req->rl_wtype != rpcrdma_noch)
+		result = rpcrdma_create_chunks(rqst, &rqst->rq_rcv_buf,
+					       headerp, req->rl_wtype);
+	return result;
+}
+
+/*
  * Copy write data inline.
  * This function is used for "small" requests. Data which is passed
  * to RPC via iovecs (or page list) is copied directly into the
@@ -377,7 +391,6 @@  rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	char *base;
 	size_t rpclen, padlen;
 	ssize_t hdrlen;
-	enum rpcrdma_chunktype rtype, wtype;
 	struct rpcrdma_msg *headerp;
 
 	/*
@@ -415,13 +428,13 @@  rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	 * into pages; otherwise use reply chunks.
 	 */
 	if (rqst->rq_rcv_buf.buflen <= RPCRDMA_INLINE_READ_THRESHOLD(rqst))
-		wtype = rpcrdma_noch;
+		req->rl_wtype = rpcrdma_noch;
 	else if (rqst->rq_rcv_buf.page_len == 0)
-		wtype = rpcrdma_replych;
+		req->rl_wtype = rpcrdma_replych;
 	else if (rqst->rq_rcv_buf.flags & XDRBUF_READ)
-		wtype = rpcrdma_writech;
+		req->rl_wtype = rpcrdma_writech;
 	else
-		wtype = rpcrdma_replych;
+		req->rl_wtype = rpcrdma_replych;
 
 	/*
 	 * Chunks needed for arguments?
@@ -438,16 +451,16 @@  rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	 * TBD check NFSv4 setacl
 	 */
 	if (rqst->rq_snd_buf.len <= RPCRDMA_INLINE_WRITE_THRESHOLD(rqst))
-		rtype = rpcrdma_noch;
+		req->rl_rtype = rpcrdma_noch;
 	else if (rqst->rq_snd_buf.page_len == 0)
-		rtype = rpcrdma_areadch;
+		req->rl_rtype = rpcrdma_areadch;
 	else
-		rtype = rpcrdma_readch;
+		req->rl_rtype = rpcrdma_readch;
 
 	/* The following simplification is not true forever */
-	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
-		wtype = rpcrdma_noch;
-	if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
+	if (req->rl_rtype != rpcrdma_noch && req->rl_wtype == rpcrdma_replych)
+		req->rl_wtype = rpcrdma_noch;
+	if (req->rl_rtype != rpcrdma_noch && req->rl_wtype != rpcrdma_noch) {
 		dprintk("RPC:       %s: cannot marshal multiple chunk lists\n",
 			__func__);
 		return -EIO;
@@ -461,7 +474,7 @@  rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	 * When padding is in use and applies to the transfer, insert
 	 * it and change the message type.
 	 */
-	if (rtype == rpcrdma_noch) {
+	if (req->rl_rtype == rpcrdma_noch) {
 
 		padlen = rpcrdma_inline_pullup(rqst,
 						RPCRDMA_INLINE_PAD_VALUE(rqst));
@@ -476,7 +489,7 @@  rpcrdma_marshal_req(struct rpc_rqst *rqst)
 			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
 			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
 			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
-			if (wtype != rpcrdma_noch) {
+			if (req->rl_wtype != rpcrdma_noch) {
 				dprintk("RPC:       %s: invalid chunk list\n",
 					__func__);
 				return -EIO;
@@ -497,30 +510,18 @@  rpcrdma_marshal_req(struct rpc_rqst *rqst)
 			 * on receive. Therefore, we request a reply chunk
 			 * for non-writes wherever feasible and efficient.
 			 */
-			if (wtype == rpcrdma_noch)
-				wtype = rpcrdma_replych;
+			if (req->rl_wtype == rpcrdma_noch)
+				req->rl_wtype = rpcrdma_replych;
 		}
 	}
 
-	/*
-	 * Marshal chunks. This routine will return the header length
-	 * consumed by marshaling.
-	 */
-	if (rtype != rpcrdma_noch) {
-		hdrlen = rpcrdma_create_chunks(rqst,
-					&rqst->rq_snd_buf, headerp, rtype);
-		wtype = rtype;	/* simplify dprintk */
-
-	} else if (wtype != rpcrdma_noch) {
-		hdrlen = rpcrdma_create_chunks(rqst,
-					&rqst->rq_rcv_buf, headerp, wtype);
-	}
+	hdrlen = rpcrdma_marshal_chunks(rqst, hdrlen);
 	if (hdrlen < 0)
 		return hdrlen;
 
 	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
 		" headerp 0x%p base 0x%p lkey 0x%x\n",
-		__func__, transfertypes[wtype], hdrlen, rpclen, padlen,
+		__func__, transfertypes[req->rl_wtype], hdrlen, rpclen, padlen,
 		headerp, base, req->rl_iov.lkey);
 
 	/*
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 4185102..f6d280b 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -597,13 +597,14 @@  xprt_rdma_send_request(struct rpc_task *task)
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
-	int rc;
+	int rc = 0;
 
-	if (req->rl_niovs == 0) {
+	if (req->rl_niovs == 0)
 		rc = rpcrdma_marshal_req(rqst);
-		if (rc < 0)
-			goto failed_marshal;
-	}
+	else if (r_xprt->rx_ia.ri_memreg_strategy == RPCRDMA_FRMR)
+		rc = rpcrdma_marshal_chunks(rqst, 0);
+	if (rc < 0)
+		goto failed_marshal;
 
 	if (req->rl_reply == NULL) 		/* e.g. reconnection */
 		rpcrdma_recv_buffer_get(req);
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index f3d86b2..c270e59 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -99,6 +99,14 @@  struct rpcrdma_ep {
 #define INIT_CQCOUNT(ep) atomic_set(&(ep)->rep_cqcount, (ep)->rep_cqinit)
 #define DECR_CQCOUNT(ep) atomic_sub_return(1, &(ep)->rep_cqcount)
 
+enum rpcrdma_chunktype {
+	rpcrdma_noch = 0,
+	rpcrdma_readch,
+	rpcrdma_areadch,
+	rpcrdma_writech,
+	rpcrdma_replych
+};
+
 /*
  * struct rpcrdma_rep -- this structure encapsulates state required to recv
  * and complete a reply, asychronously. It needs several pieces of
@@ -192,6 +200,7 @@  struct rpcrdma_req {
 	unsigned int	rl_niovs;	/* 0, 2 or 4 */
 	unsigned int	rl_nchunks;	/* non-zero if chunks */
 	unsigned int	rl_connect_cookie;	/* retry detection */
+	enum rpcrdma_chunktype	rl_rtype, rl_wtype;
 	struct rpcrdma_buffer *rl_buffer; /* home base for this structure */
 	struct rpcrdma_rep	*rl_reply;/* holder for reply buffer */
 	struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS];/* chunk segments */
@@ -347,6 +356,7 @@  void rpcrdma_reply_handler(struct rpcrdma_rep *);
 /*
  * RPC/RDMA protocol calls - xprtrdma/rpc_rdma.c
  */
+ssize_t rpcrdma_marshal_chunks(struct rpc_rqst *, ssize_t);
 int rpcrdma_marshal_req(struct rpc_rqst *);
 size_t rpcrdma_max_payload(struct rpcrdma_xprt *);