[v1,05/14] svcrdma: Introduce local rdma_rw API helpers
diff mbox

Message ID 20170316155306.4482.68041.stgit@klimt.1015granger.net
State New
Headers show

Commit Message

Chuck Lever March 16, 2017, 3:53 p.m. UTC
The plan is to replace the local bespoke code that constructs and
posts RDMA Read and Write Work Requests with calls to the rdma_rw
API. This shares code with other RDMA-enabled ULPs that manages the
gory details of buffer registration and posting Work Requests.

Some design notes:

 o svc_xprt reference counting is modified, since one rdma_rw_ctx
   generates one completion, no matter how many Write WRs are
   posted. To accommodate the new reference counting scheme, a new
   version of svc_rdma_send() is introduced.

 o The structure of RPC-over-RDMA transport headers is flexible,
   allowing multiple segments per Reply with arbitrary alignment.
   Thus I did not take the further step of chaining Write WRs with
   the Send WR containing the RPC Reply message. The Write and Send
   WRs continue to be built by separate pieces of code.

 o The current code builds the transport header as it is construct-
   ing Write WRs. I've replaced that with marshaling of transport
   header data items in a separate step. This is because the exact
   structure of client-provided segments may not align with the
   components of the server's reply xdr_buf, or the pages in the
   page list. Thus parts of each client-provided segment may be
   written at different points in the send path.

 o Since the Write list and Reply chunk marshaling code is being
   replaced, I took the opportunity to replace some of the C
   structure-based XDR encoding code with more portable code that
   instead uses pointer arithmetic.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h          |   22 +
 net/sunrpc/xprtrdma/Makefile             |    2 
 net/sunrpc/xprtrdma/svc_rdma_marshal.c   |  114 ++++
 net/sunrpc/xprtrdma/svc_rdma_rw.c        |  785 ++++++++++++++++++++++++++++++
 net/sunrpc/xprtrdma/svc_rdma_sendto.c    |    2 
 net/sunrpc/xprtrdma/svc_rdma_transport.c |    4 
 6 files changed, 925 insertions(+), 4 deletions(-)
 create mode 100644 net/sunrpc/xprtrdma/svc_rdma_rw.c


--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

Sagi Grimberg March 22, 2017, 2:17 p.m. UTC | #1
> The plan is to replace the local bespoke code that constructs and
> posts RDMA Read and Write Work Requests with calls to the rdma_rw
> API. This shares code with other RDMA-enabled ULPs that manages the
> gory details of buffer registration and posting Work Requests.
>
> Some design notes:
>
>  o svc_xprt reference counting is modified, since one rdma_rw_ctx
>    generates one completion, no matter how many Write WRs are
>    posted. To accommodate the new reference counting scheme, a new
>    version of svc_rdma_send() is introduced.
>
>  o The structure of RPC-over-RDMA transport headers is flexible,
>    allowing multiple segments per Reply with arbitrary alignment.
>    Thus I did not take the further step of chaining Write WRs with
>    the Send WR containing the RPC Reply message. The Write and Send
>    WRs continue to be built by separate pieces of code.
>
>  o The current code builds the transport header as it is construct-
>    ing Write WRs. I've replaced that with marshaling of transport
>    header data items in a separate step. This is because the exact
>    structure of client-provided segments may not align with the
>    components of the server's reply xdr_buf, or the pages in the
>    page list. Thus parts of each client-provided segment may be
>    written at different points in the send path.
>
>  o Since the Write list and Reply chunk marshaling code is being
>    replaced, I took the opportunity to replace some of the C
>    structure-based XDR encoding code with more portable code that
>    instead uses pointer arithmetic.
>
> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>

To be honest its difficult to review this patch, but its probably
difficult to split it too...

> +
> +/* One Write chunk is copied from Call transport header to Reply
> + * transport header. Each segment's length field is updated to
> + * reflect number of bytes consumed in the segment.
> + *
> + * Returns number of segments in this chunk.
> + */
> +static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
> +					   unsigned int remaining)

Is this only for data-in-reply (send operation)? I don't see why you
would need to modify that for RDMA operations.

Perhaps I'd try to split the data-in-reply code from the actual rdma
conversion. It might be helpful to comprehend.

> +{
> +	unsigned int i, nsegs;
> +	u32 seg_len;
> +
> +	/* Write list discriminator */
> +	*dst++ = *src++;

I had to actually run a test program to understand the precedence
here so parenthesis would've helped :)

> diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
> new file mode 100644
> index 0000000..1e76227
> --- /dev/null
> +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
> @@ -0,0 +1,785 @@
> +/*
> + * Copyright (c) 2016 Oracle.  All rights reserved.
> + *
> + * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
> + */
> +
> +#include <linux/sunrpc/rpc_rdma.h>
> +#include <linux/sunrpc/svc_rdma.h>
> +#include <linux/sunrpc/debug.h>
> +
> +#include <rdma/rw.h>
> +
> +#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
> +
> +/* Each R/W context contains state for one chain of RDMA Read or
> + * Write Work Requests (one RDMA segment to be read from or written
> + * back to the client).
> + *
> + * Each WR chain handles a single contiguous server-side buffer,
> + * because some registration modes (eg. FRWR) do not support a
> + * discontiguous scatterlist.
> + *
> + * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
> + * from a client may contain a unique R_key, so each WR chain moves
> + * one segment (or less) at a time.
> + *
> + * The scatterlist makes this data structure just over 8KB in size
> + * on 4KB-page platforms. As the size of this structure increases
> + * past one page, it becomes more likely that allocating one of these
> + * will fail. Therefore, these contexts are created on demand, but
> + * cached and reused until the controlling svcxprt_rdma is destroyed.
> + */
> +struct svc_rdma_rw_ctxt {
> +	struct list_head	rw_list;
> +	struct ib_cqe		rw_cqe;
> +	struct svcxprt_rdma	*rw_rdma;
> +	int			rw_nents;
> +	int			rw_wrcount;
> +	enum dma_data_direction	rw_dir;
> +	struct svc_rdma_op_ctxt	*rw_readctxt;
> +	struct rdma_rw_ctx	rw_ctx;
> +	struct scatterlist	rw_sg[RPCSVC_MAXPAGES];

Have you considered using sg_table with sg_alloc_table_chained?

See lib/sg_pool.c and nvme-rdma as a consumer.

> +};
> +
> +static struct svc_rdma_rw_ctxt *
> +svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma)
> +{
> +	struct svc_rdma_rw_ctxt *ctxt;
> +
> +	svc_xprt_get(&rdma->sc_xprt);
> +
> +	spin_lock(&rdma->sc_rw_ctxt_lock);
> +	if (list_empty(&rdma->sc_rw_ctxts))
> +		goto out_empty;
> +
> +	ctxt = list_first_entry(&rdma->sc_rw_ctxts,
> +				struct svc_rdma_rw_ctxt, rw_list);
> +	list_del_init(&ctxt->rw_list);
> +	spin_unlock(&rdma->sc_rw_ctxt_lock);
> +
> +out:
> +	ctxt->rw_dir = DMA_NONE;
> +	return ctxt;
> +
> +out_empty:
> +	spin_unlock(&rdma->sc_rw_ctxt_lock);
> +
> +	ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
> +	if (!ctxt) {
> +		svc_xprt_put(&rdma->sc_xprt);
> +		return NULL;
> +	}
> +
> +	ctxt->rw_rdma = rdma;
> +	INIT_LIST_HEAD(&ctxt->rw_list);
> +	sg_init_table(ctxt->rw_sg, ARRAY_SIZE(ctxt->rw_sg));
> +	goto out;
> +}
> +
> +static void svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt)
> +{
> +	struct svcxprt_rdma *rdma = ctxt->rw_rdma;
> +
> +	if (ctxt->rw_dir != DMA_NONE)
> +		rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
> +				    rdma->sc_port_num,
> +				    ctxt->rw_sg, ctxt->rw_nents,
> +				    ctxt->rw_dir);
> +

its a bit odd to see put_rw_ctxt that also destroys the context
which isn't exactly pairs with get_rw_ctxt.

Maybe it'd be useful to explicitly do that outside the put.

> +/**
> + * svc_rdma_destroy_rw_ctxts - Free write contexts
> + * @rdma: xprt about to be destroyed
> + *
> + */
> +void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
> +{
> +	struct svc_rdma_rw_ctxt *ctxt;
> +
> +	while (!list_empty(&rdma->sc_rw_ctxts)) {
> +		ctxt = list_first_entry(&rdma->sc_rw_ctxts,
> +					struct svc_rdma_rw_ctxt, rw_list);
> +		list_del(&ctxt->rw_list);
> +		kfree(ctxt);
> +	}
> +}
> +
> +/**
> + * svc_rdma_wc_write_ctx - Handle completion of an RDMA Write ctx
> + * @cq: controlling Completion Queue
> + * @wc: Work Completion
> + *
> + * Invoked in soft IRQ context.
> + *
> + * Assumptions:
> + * - Write completion is not responsible for freeing pages under
> + *   I/O.
> + */
> +static void svc_rdma_wc_write_ctx(struct ib_cq *cq, struct ib_wc *wc)
> +{
> +	struct ib_cqe *cqe = wc->wr_cqe;
> +	struct svc_rdma_rw_ctxt *ctxt =
> +			container_of(cqe, struct svc_rdma_rw_ctxt, rw_cqe);
> +	struct svcxprt_rdma *rdma = ctxt->rw_rdma;
> +
> +	atomic_add(ctxt->rw_wrcount, &rdma->sc_sq_avail);
> +	wake_up(&rdma->sc_send_wait);
> +
> +	if (wc->status != IB_WC_SUCCESS)
> +		goto flush;
> +
> +out:
> +	svc_rdma_put_rw_ctxt(ctxt);
> +	return;
> +
> +flush:
> +	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
> +	if (wc->status != IB_WC_WR_FLUSH_ERR)
> +		pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
> +		       ib_wc_status_msg(wc->status),
> +		       wc->status, wc->vendor_err);
> +	goto out;
> +}
> +
> +/**
> + * svc_rdma_wc_read_ctx - Handle completion of an RDMA Read ctx
> + * @cq: controlling Completion Queue
> + * @wc: Work Completion
> + *
> + * Invoked in soft IRQ context.

? in soft IRQ?

> + */
> +static void svc_rdma_wc_read_ctx(struct ib_cq *cq, struct ib_wc *wc)
> +{
> +	struct ib_cqe *cqe = wc->wr_cqe;
> +	struct svc_rdma_rw_ctxt *ctxt =
> +			container_of(cqe, struct svc_rdma_rw_ctxt, rw_cqe);
> +	struct svcxprt_rdma *rdma = ctxt->rw_rdma;
> +	struct svc_rdma_op_ctxt *head;
> +
> +	atomic_add(ctxt->rw_wrcount, &rdma->sc_sq_avail);
> +	wake_up(&rdma->sc_send_wait);
> +
> +	if (wc->status != IB_WC_SUCCESS)
> +		goto flush;
> +
> +	head = ctxt->rw_readctxt;
> +	if (!head)
> +		goto out;
> +
> +	spin_lock(&rdma->sc_rq_dto_lock);
> +	list_add_tail(&head->list, &rdma->sc_read_complete_q);
> +	spin_unlock(&rdma->sc_rq_dto_lock);

Not sure what sc_read_complete_q does... what post processing is
needed for completed reads?

> +/* This function sleeps when the transport's Send Queue is congested.

Is this easy to trigger?
--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Chuck Lever March 22, 2017, 3:41 p.m. UTC | #2
> On Mar 22, 2017, at 10:17 AM, Sagi Grimberg <sagi@grimberg.me> wrote:
> 
>> 
>> The plan is to replace the local bespoke code that constructs and
>> posts RDMA Read and Write Work Requests with calls to the rdma_rw
>> API. This shares code with other RDMA-enabled ULPs that manages the
>> gory details of buffer registration and posting Work Requests.
>> 
>> Some design notes:
>> 
>> o svc_xprt reference counting is modified, since one rdma_rw_ctx
>>   generates one completion, no matter how many Write WRs are
>>   posted. To accommodate the new reference counting scheme, a new
>>   version of svc_rdma_send() is introduced.
>> 
>> o The structure of RPC-over-RDMA transport headers is flexible,
>>   allowing multiple segments per Reply with arbitrary alignment.
>>   Thus I did not take the further step of chaining Write WRs with
>>   the Send WR containing the RPC Reply message. The Write and Send
>>   WRs continue to be built by separate pieces of code.
>> 
>> o The current code builds the transport header as it is construct-
>>   ing Write WRs. I've replaced that with marshaling of transport
>>   header data items in a separate step. This is because the exact
>>   structure of client-provided segments may not align with the
>>   components of the server's reply xdr_buf, or the pages in the
>>   page list. Thus parts of each client-provided segment may be
>>   written at different points in the send path.
>> 
>> o Since the Write list and Reply chunk marshaling code is being
>>   replaced, I took the opportunity to replace some of the C
>>   structure-based XDR encoding code with more portable code that
>>   instead uses pointer arithmetic.
>> 
>> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
> 
> To be honest its difficult to review this patch, but its probably
> difficult to split it too...

I agree this is an unfortunately large heap of code. However, I'm
somewhat constrained by Bruce's requirement that I introduce new
code and use it in the same (or an adjacent) patch.

In the next version of this series, the read code (below) has been
removed from this patch, since it's not actually used until
nfsd-rdma-rw-api.


>> +
>> +/* One Write chunk is copied from Call transport header to Reply
>> + * transport header. Each segment's length field is updated to
>> + * reflect number of bytes consumed in the segment.
>> + *
>> + * Returns number of segments in this chunk.
>> + */
>> +static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
>> +					   unsigned int remaining)
> 
> Is this only for data-in-reply (send operation)? I don't see why you
> would need to modify that for RDMA operations.

The sendto path encodes the chunk list in the transport header
as it is posting the RDMA Writes. So this function is used when
there are RDMA Writes before the actual RPC Reply.

I could add this code in a preceding patch, but again, Bruce
likes to see all the code added and used at the same time.


> Perhaps I'd try to split the data-in-reply code from the actual rdma
> conversion. It might be helpful to comprehend.

I'm not sure what you mean, but it might be that we are using
these terms a little differently.


>> +{
>> +	unsigned int i, nsegs;
>> +	u32 seg_len;
>> +
>> +	/* Write list discriminator */
>> +	*dst++ = *src++;
> 
> I had to actually run a test program to understand the precedence
> here so parenthesis would've helped :)

*dst++ = *src++ is a common idiom in networking code and
XDR encoders/decoders, though it is a little old-fashioned.


>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
>> new file mode 100644
>> index 0000000..1e76227
>> --- /dev/null
>> +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
>> @@ -0,0 +1,785 @@
>> +/*
>> + * Copyright (c) 2016 Oracle.  All rights reserved.
>> + *
>> + * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
>> + */
>> +
>> +#include <linux/sunrpc/rpc_rdma.h>
>> +#include <linux/sunrpc/svc_rdma.h>
>> +#include <linux/sunrpc/debug.h>
>> +
>> +#include <rdma/rw.h>
>> +
>> +#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
>> +
>> +/* Each R/W context contains state for one chain of RDMA Read or
>> + * Write Work Requests (one RDMA segment to be read from or written
>> + * back to the client).
>> + *
>> + * Each WR chain handles a single contiguous server-side buffer,
>> + * because some registration modes (eg. FRWR) do not support a
>> + * discontiguous scatterlist.
>> + *
>> + * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
>> + * from a client may contain a unique R_key, so each WR chain moves
>> + * one segment (or less) at a time.
>> + *
>> + * The scatterlist makes this data structure just over 8KB in size
>> + * on 4KB-page platforms. As the size of this structure increases
>> + * past one page, it becomes more likely that allocating one of these
>> + * will fail. Therefore, these contexts are created on demand, but
>> + * cached and reused until the controlling svcxprt_rdma is destroyed.
>> + */
>> +struct svc_rdma_rw_ctxt {
>> +	struct list_head	rw_list;
>> +	struct ib_cqe		rw_cqe;
>> +	struct svcxprt_rdma	*rw_rdma;
>> +	int			rw_nents;
>> +	int			rw_wrcount;
>> +	enum dma_data_direction	rw_dir;
>> +	struct svc_rdma_op_ctxt	*rw_readctxt;
>> +	struct rdma_rw_ctx	rw_ctx;
>> +	struct scatterlist	rw_sg[RPCSVC_MAXPAGES];
> 
> Have you considered using sg_table with sg_alloc_table_chained?
> 
> See lib/sg_pool.c and nvme-rdma as a consumer.

That might be newer than my patches. I'll have a look.


>> +};
>> +
>> +static struct svc_rdma_rw_ctxt *
>> +svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma)
>> +{
>> +	struct svc_rdma_rw_ctxt *ctxt;
>> +
>> +	svc_xprt_get(&rdma->sc_xprt);
>> +
>> +	spin_lock(&rdma->sc_rw_ctxt_lock);
>> +	if (list_empty(&rdma->sc_rw_ctxts))
>> +		goto out_empty;
>> +
>> +	ctxt = list_first_entry(&rdma->sc_rw_ctxts,
>> +				struct svc_rdma_rw_ctxt, rw_list);
>> +	list_del_init(&ctxt->rw_list);
>> +	spin_unlock(&rdma->sc_rw_ctxt_lock);
>> +
>> +out:
>> +	ctxt->rw_dir = DMA_NONE;
>> +	return ctxt;
>> +
>> +out_empty:
>> +	spin_unlock(&rdma->sc_rw_ctxt_lock);
>> +
>> +	ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
>> +	if (!ctxt) {
>> +		svc_xprt_put(&rdma->sc_xprt);
>> +		return NULL;
>> +	}
>> +
>> +	ctxt->rw_rdma = rdma;
>> +	INIT_LIST_HEAD(&ctxt->rw_list);
>> +	sg_init_table(ctxt->rw_sg, ARRAY_SIZE(ctxt->rw_sg));
>> +	goto out;
>> +}
>> +
>> +static void svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt)
>> +{
>> +	struct svcxprt_rdma *rdma = ctxt->rw_rdma;
>> +
>> +	if (ctxt->rw_dir != DMA_NONE)
>> +		rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
>> +				    rdma->sc_port_num,
>> +				    ctxt->rw_sg, ctxt->rw_nents,
>> +				    ctxt->rw_dir);
>> +
> 
> its a bit odd to see put_rw_ctxt that also destroys the context
> which isn't exactly pairs with get_rw_ctxt.
> 
> Maybe it'd be useful to explicitly do that outside the put.

The pairing is not obvious, but it is this:

svc_rdma_send_writes() does the svc_rdma_get_rw_ctx.

-> If posting succeeds, svc_rdma_wc_write_ctx puts the ctx.

-> If posting fails, svc_rdma_send_writes puts the ctx.

Do you have a suggestion about how this could be more
intuitively documented?

IIRC I combined these because the rdma_rw_ctx_destroy is
always done just before putting the ctx back on the free
list. It eliminates some code duplication, and ensures
the ctx is always ready for the next svc_rdma_get_rw_ctx.


>> +/**
>> + * svc_rdma_destroy_rw_ctxts - Free write contexts
>> + * @rdma: xprt about to be destroyed
>> + *
>> + */
>> +void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
>> +{
>> +	struct svc_rdma_rw_ctxt *ctxt;
>> +
>> +	while (!list_empty(&rdma->sc_rw_ctxts)) {
>> +		ctxt = list_first_entry(&rdma->sc_rw_ctxts,
>> +					struct svc_rdma_rw_ctxt, rw_list);
>> +		list_del(&ctxt->rw_list);
>> +		kfree(ctxt);
>> +	}
>> +}
>> +
>> +/**
>> + * svc_rdma_wc_write_ctx - Handle completion of an RDMA Write ctx
>> + * @cq: controlling Completion Queue
>> + * @wc: Work Completion
>> + *
>> + * Invoked in soft IRQ context.
>> + *
>> + * Assumptions:
>> + * - Write completion is not responsible for freeing pages under
>> + *   I/O.
>> + */
>> +static void svc_rdma_wc_write_ctx(struct ib_cq *cq, struct ib_wc *wc)
>> +{
>> +	struct ib_cqe *cqe = wc->wr_cqe;
>> +	struct svc_rdma_rw_ctxt *ctxt =
>> +			container_of(cqe, struct svc_rdma_rw_ctxt, rw_cqe);
>> +	struct svcxprt_rdma *rdma = ctxt->rw_rdma;
>> +
>> +	atomic_add(ctxt->rw_wrcount, &rdma->sc_sq_avail);
>> +	wake_up(&rdma->sc_send_wait);
>> +
>> +	if (wc->status != IB_WC_SUCCESS)
>> +		goto flush;
>> +
>> +out:
>> +	svc_rdma_put_rw_ctxt(ctxt);
>> +	return;
>> +
>> +flush:
>> +	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
>> +	if (wc->status != IB_WC_WR_FLUSH_ERR)
>> +		pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
>> +		       ib_wc_status_msg(wc->status),
>> +		       wc->status, wc->vendor_err);
>> +	goto out;
>> +}
>> +
>> +/**
>> + * svc_rdma_wc_read_ctx - Handle completion of an RDMA Read ctx
>> + * @cq: controlling Completion Queue
>> + * @wc: Work Completion
>> + *
>> + * Invoked in soft IRQ context.
> 
> ? in soft IRQ?

Not sure I understand this comment?


> 
>> + */
>> +static void svc_rdma_wc_read_ctx(struct ib_cq *cq, struct ib_wc *wc)
>> +{
>> +	struct ib_cqe *cqe = wc->wr_cqe;
>> +	struct svc_rdma_rw_ctxt *ctxt =
>> +			container_of(cqe, struct svc_rdma_rw_ctxt, rw_cqe);
>> +	struct svcxprt_rdma *rdma = ctxt->rw_rdma;
>> +	struct svc_rdma_op_ctxt *head;
>> +
>> +	atomic_add(ctxt->rw_wrcount, &rdma->sc_sq_avail);
>> +	wake_up(&rdma->sc_send_wait);
>> +
>> +	if (wc->status != IB_WC_SUCCESS)
>> +		goto flush;
>> +
>> +	head = ctxt->rw_readctxt;
>> +	if (!head)
>> +		goto out;
>> +
>> +	spin_lock(&rdma->sc_rq_dto_lock);
>> +	list_add_tail(&head->list, &rdma->sc_read_complete_q);
>> +	spin_unlock(&rdma->sc_rq_dto_lock);
> 
> Not sure what sc_read_complete_q does... what post processing is
> needed for completed reads?

I postponed this until nfsd-rdma-rw-api. Briefly, yes, there's
a lot of work to do when receiving an RPC Call with Read chunks.


>> +/* This function sleeps when the transport's Send Queue is congested.
> 
> Is this easy to trigger?

Not really, but it does happen.

This is one of the problems with RPC-over-RDMA. It's not practical
for the server to estimate its SQ size large enough for every
possible scenario. And, as you observed before, some HCA/RNICs
will have limited SQ capabilities.

--
Chuck Lever



--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Chuck Lever March 24, 2017, 10:19 p.m. UTC | #3
> On Mar 22, 2017, at 11:41 AM, Chuck Lever <chuck.lever@oracle.com> wrote:
> 
>> 
>> On Mar 22, 2017, at 10:17 AM, Sagi Grimberg <sagi@grimberg.me> wrote:
>> 
>>> 
>>> The plan is to replace the local bespoke code that constructs and
>>> posts RDMA Read and Write Work Requests with calls to the rdma_rw
>>> API. This shares code with other RDMA-enabled ULPs that manages the
>>> gory details of buffer registration and posting Work Requests.
>>> 
>>> Some design notes:
>>> 
>>> o svc_xprt reference counting is modified, since one rdma_rw_ctx
>>>  generates one completion, no matter how many Write WRs are
>>>  posted. To accommodate the new reference counting scheme, a new
>>>  version of svc_rdma_send() is introduced.
>>> 
>>> o The structure of RPC-over-RDMA transport headers is flexible,
>>>  allowing multiple segments per Reply with arbitrary alignment.
>>>  Thus I did not take the further step of chaining Write WRs with
>>>  the Send WR containing the RPC Reply message. The Write and Send
>>>  WRs continue to be built by separate pieces of code.
>>> 
>>> o The current code builds the transport header as it is construct-
>>>  ing Write WRs. I've replaced that with marshaling of transport
>>>  header data items in a separate step. This is because the exact
>>>  structure of client-provided segments may not align with the
>>>  components of the server's reply xdr_buf, or the pages in the
>>>  page list. Thus parts of each client-provided segment may be
>>>  written at different points in the send path.
>>> 
>>> o Since the Write list and Reply chunk marshaling code is being
>>>  replaced, I took the opportunity to replace some of the C
>>>  structure-based XDR encoding code with more portable code that
>>>  instead uses pointer arithmetic.
>>> 
>>> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
> 
>>> diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
>>> new file mode 100644
>>> index 0000000..1e76227
>>> --- /dev/null
>>> +++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
>>> @@ -0,0 +1,785 @@
>>> +/*
>>> + * Copyright (c) 2016 Oracle.  All rights reserved.
>>> + *
>>> + * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
>>> + */
>>> +
>>> +#include <linux/sunrpc/rpc_rdma.h>
>>> +#include <linux/sunrpc/svc_rdma.h>
>>> +#include <linux/sunrpc/debug.h>
>>> +
>>> +#include <rdma/rw.h>
>>> +
>>> +#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
>>> +
>>> +/* Each R/W context contains state for one chain of RDMA Read or
>>> + * Write Work Requests (one RDMA segment to be read from or written
>>> + * back to the client).
>>> + *
>>> + * Each WR chain handles a single contiguous server-side buffer,
>>> + * because some registration modes (eg. FRWR) do not support a
>>> + * discontiguous scatterlist.
>>> + *
>>> + * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
>>> + * from a client may contain a unique R_key, so each WR chain moves
>>> + * one segment (or less) at a time.
>>> + *
>>> + * The scatterlist makes this data structure just over 8KB in size
>>> + * on 4KB-page platforms. As the size of this structure increases
>>> + * past one page, it becomes more likely that allocating one of these
>>> + * will fail. Therefore, these contexts are created on demand, but
>>> + * cached and reused until the controlling svcxprt_rdma is destroyed.
>>> + */
>>> +struct svc_rdma_rw_ctxt {
>>> +	struct list_head	rw_list;
>>> +	struct ib_cqe		rw_cqe;
>>> +	struct svcxprt_rdma	*rw_rdma;
>>> +	int			rw_nents;
>>> +	int			rw_wrcount;
>>> +	enum dma_data_direction	rw_dir;
>>> +	struct svc_rdma_op_ctxt	*rw_readctxt;
>>> +	struct rdma_rw_ctx	rw_ctx;
>>> +	struct scatterlist	rw_sg[RPCSVC_MAXPAGES];
>> 
>> Have you considered using sg_table with sg_alloc_table_chained?
>> 
>> See lib/sg_pool.c and nvme-rdma as a consumer.
> 
> That might be newer than my patches. I'll have a look.

I looked at the consumers of sg_alloc_table_chained, and
all these callers are immediately doing ib_dma_map_sg.

Nothing in svc_rdma_rw.c does a DMA map. It relies on
rdma_rw_ctx_init for that, and that API is passed a
scatterlist.

I don't see how I could use sg_alloc_table_chained here,
unless rdma_rw_ctx_init was modified to take a chained
sg_table instead of a scatterlist argument.

I suppose I could convert the client side to use it?
What do you think?


>>> +};
>>> +
>>> +static struct svc_rdma_rw_ctxt *
>>> +svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma)
>>> +{
>>> +	struct svc_rdma_rw_ctxt *ctxt;
>>> +
>>> +	svc_xprt_get(&rdma->sc_xprt);
>>> +
>>> +	spin_lock(&rdma->sc_rw_ctxt_lock);
>>> +	if (list_empty(&rdma->sc_rw_ctxts))
>>> +		goto out_empty;
>>> +
>>> +	ctxt = list_first_entry(&rdma->sc_rw_ctxts,
>>> +				struct svc_rdma_rw_ctxt, rw_list);
>>> +	list_del_init(&ctxt->rw_list);
>>> +	spin_unlock(&rdma->sc_rw_ctxt_lock);
>>> +
>>> +out:
>>> +	ctxt->rw_dir = DMA_NONE;
>>> +	return ctxt;
>>> +
>>> +out_empty:
>>> +	spin_unlock(&rdma->sc_rw_ctxt_lock);
>>> +
>>> +	ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
>>> +	if (!ctxt) {
>>> +		svc_xprt_put(&rdma->sc_xprt);
>>> +		return NULL;
>>> +	}
>>> +
>>> +	ctxt->rw_rdma = rdma;
>>> +	INIT_LIST_HEAD(&ctxt->rw_list);
>>> +	sg_init_table(ctxt->rw_sg, ARRAY_SIZE(ctxt->rw_sg));
>>> +	goto out;
>>> +}
>>> +
>>> +static void svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt)
>>> +{
>>> +	struct svcxprt_rdma *rdma = ctxt->rw_rdma;
>>> +
>>> +	if (ctxt->rw_dir != DMA_NONE)
>>> +		rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
>>> +				    rdma->sc_port_num,
>>> +				    ctxt->rw_sg, ctxt->rw_nents,
>>> +				    ctxt->rw_dir);
>>> +
>> 
>> its a bit odd to see put_rw_ctxt that also destroys the context
>> which isn't exactly pairs with get_rw_ctxt.
>> 
>> Maybe it'd be useful to explicitly do that outside the put.
> 
> The pairing is not obvious, but it is this:
> 
> svc_rdma_send_writes() does the svc_rdma_get_rw_ctx.
> 
> -> If posting succeeds, svc_rdma_wc_write_ctx puts the ctx.
> 
> -> If posting fails, svc_rdma_send_writes puts the ctx.
> 
> Do you have a suggestion about how this could be more
> intuitively documented?
> 
> IIRC I combined these because the rdma_rw_ctx_destroy is
> always done just before putting the ctx back on the free
> list. It eliminates some code duplication, and ensures
> the ctx is always ready for the next svc_rdma_get_rw_ctx.

I fixed this up, I think it is an improvement. Thanks for
the suggestion.


--
Chuck Lever



--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Patch
diff mbox

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index f066349..5fc9f6e 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -145,12 +145,15 @@  struct svcxprt_rdma {
 	u32		     sc_max_requests;	/* Max requests */
 	u32		     sc_max_bc_requests;/* Backward credits */
 	int                  sc_max_req_size;	/* Size of each RQ WR buf */
+	u8		     sc_port_num;
 
 	struct ib_pd         *sc_pd;
 
 	spinlock_t	     sc_ctxt_lock;
 	struct list_head     sc_ctxts;
 	int		     sc_ctxt_used;
+	spinlock_t	     sc_rw_ctxt_lock;
+	struct list_head     sc_rw_ctxts;
 	spinlock_t	     sc_map_lock;
 	struct list_head     sc_maps;
 
@@ -209,10 +212,15 @@  extern int svc_rdma_handle_bc_reply(struct rpc_xprt *xprt,
 extern int svc_rdma_xdr_encode_error(struct svcxprt_rdma *,
 				     struct rpcrdma_msg *,
 				     enum rpcrdma_errcode, __be32 *);
-extern void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *, int);
+extern void svc_rdma_old_encode_write_list(struct rpcrdma_msg *rmsgp,
+					   int chunks);
 extern void svc_rdma_xdr_encode_reply_array(struct rpcrdma_write_array *, int);
 extern void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *, int,
 					    __be32, __be64, u32);
+extern void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
+					   unsigned int consumed);
+extern void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
+					    unsigned int consumed);
 extern unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp);
 
 /* svc_rdma_recvfrom.c */
@@ -224,6 +232,18 @@  extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
 				struct svc_rdma_op_ctxt *, int *, u32 *,
 				u32, u32, u64, bool);
 
+/* svc_rdma_rw.c */
+extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
+extern int svc_rdma_recv_read_list(struct svcxprt_rdma *rdma, __be32 *ch,
+				   struct svc_rdma_op_ctxt *head,
+				   struct svc_rqst *rqstp);
+extern int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
+				    __be32 *wr_ch, __be32 *rdma_resp,
+				    struct xdr_buf *xdr);
+extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
+				     __be32 *wr_lst, __be32 *rp_ch,
+				     __be32 *rdma_resp, struct xdr_buf *xdr);
+
 /* svc_rdma_sendto.c */
 extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *,
 			    struct svc_rdma_req_map *, bool);
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index ef19fa4..c1ae814 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -4,5 +4,5 @@  rpcrdma-y := transport.o rpc_rdma.o verbs.o \
 	fmr_ops.o frwr_ops.o \
 	svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
 	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
-	module.o
+	svc_rdma_rw.o module.o
 rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma_marshal.c b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
index 1c4aabf..bf3ca7e 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_marshal.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_marshal.c
@@ -217,7 +217,7 @@  unsigned int svc_rdma_xdr_get_reply_hdr_len(__be32 *rdma_resp)
 	return (unsigned long)p - (unsigned long)rdma_resp;
 }
 
-void svc_rdma_xdr_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
+void svc_rdma_old_encode_write_list(struct rpcrdma_msg *rmsgp, int chunks)
 {
 	struct rpcrdma_write_array *ary;
 
@@ -255,3 +255,115 @@  void svc_rdma_xdr_encode_array_chunk(struct rpcrdma_write_array *ary,
 	seg->rs_offset = rs_offset;
 	seg->rs_length = cpu_to_be32(write_len);
 }
+
+/* One Write chunk is copied from Call transport header to Reply
+ * transport header. Each segment's length field is updated to
+ * reflect number of bytes consumed in the segment.
+ *
+ * Returns number of segments in this chunk.
+ */
+static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+					   unsigned int remaining)
+{
+	unsigned int i, nsegs;
+	u32 seg_len;
+
+	/* Write list discriminator */
+	*dst++ = *src++;
+
+	/* number of segments in this chunk */
+	nsegs = be32_to_cpup(src);
+	*dst++ = *src++;
+
+	for (i = nsegs; i; i--) {
+		/* segment's RDMA handle */
+		*dst++ = *src++;
+
+		/* bytes returned in this segment */
+		seg_len = be32_to_cpu(*src);
+		if (remaining >= seg_len) {
+			/* entire segment was consumed */
+			*dst = *src;
+			remaining -= seg_len;
+		} else {
+			/* segment only partly filled */
+			*dst = cpu_to_be32(remaining);
+			remaining = 0;
+		}
+		dst++; src++;
+
+		/* segment's RDMA offset */
+		*dst++ = *src++;
+		*dst++ = *src++;
+	}
+
+	return nsegs;
+}
+
+/**
+ * svc_rdma_xdr_encode_write_list - Encode Reply's Write list
+ * @rdma_resp: Reply's transport header
+ * @wr_ch: Write list in Call's transport header
+ * @consumed: total Write chunk bytes consumed in Reply
+ *
+ * The client provided a Write list in the Call message. Fill in
+ * the segments in the first Write chunk in the Reply's transport
+ * header with the number of bytes consumed in each segment.
+ * Remaining chunks are returned unused.
+ *
+ * Assumptions:
+ *  - Server can consume only one Write chunk.
+ */
+void svc_rdma_xdr_encode_write_list(__be32 *rdma_resp, __be32 *wr_ch,
+				    unsigned int consumed)
+{
+	unsigned int nsegs;
+	__be32 *p, *q;
+
+	/* RPC-over-RDMA V1 replies never have a Read list. */
+	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+
+	q = wr_ch;
+	while (*q != xdr_zero) {
+		nsegs = xdr_encode_write_chunk(p, q, consumed);
+		q += 2 + nsegs * rpcrdma_segment_maxsz;
+		p += 2 + nsegs * rpcrdma_segment_maxsz;
+		consumed = 0;
+	}
+
+	/* Terminate Write list */
+	*p++ = xdr_zero;
+
+	/* Reply chunk discriminator; may be replaced later */
+	*p = xdr_zero;
+}
+
+/**
+ * svc_rdma_xdr_encode_reply_chunk - Encode Reply's Reply chunk
+ * @rdma_resp: Reply's transport header
+ * @rp_ch: Reply chunk in Call's transport header
+ * @consumed: total Reply chunk bytes consumed in Reply
+ *
+ * The client provided a Reply chunk in the Call message. Fill in
+ * the segments in the Reply chunk in the Reply message with the
+ * number of bytes consumed in each segment.
+ *
+ * Assumptions:
+ * - Reply chunk is smaller than or equal in size to Reply
+ */
+void svc_rdma_xdr_encode_reply_chunk(__be32 *rdma_resp, __be32 *rp_ch,
+				     unsigned int consumed)
+{
+	__be32 *p;
+
+	/* Find the Reply chunk in the Reply's xprt header.
+	 * RPC-over-RDMA V1 replies never have a Read list.
+	 */
+	p = rdma_resp + rpcrdma_fixed_maxsz + 1;
+
+	/* Skip past Write list */
+	while (*p++ != xdr_zero)
+		p += 1 + be32_to_cpup(p) * rpcrdma_segment_maxsz;
+
+	xdr_encode_write_chunk(p, rp_ch, consumed);
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
new file mode 100644
index 0000000..1e76227
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -0,0 +1,785 @@ 
+/*
+ * Copyright (c) 2016 Oracle.  All rights reserved.
+ *
+ * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
+ */
+
+#include <linux/sunrpc/rpc_rdma.h>
+#include <linux/sunrpc/svc_rdma.h>
+#include <linux/sunrpc/debug.h>
+
+#include <rdma/rw.h>
+
+#define RPCDBG_FACILITY	RPCDBG_SVCXPRT
+
+/* Each R/W context contains state for one chain of RDMA Read or
+ * Write Work Requests (one RDMA segment to be read from or written
+ * back to the client).
+ *
+ * Each WR chain handles a single contiguous server-side buffer,
+ * because some registration modes (eg. FRWR) do not support a
+ * discontiguous scatterlist.
+ *
+ * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
+ * from a client may contain a unique R_key, so each WR chain moves
+ * one segment (or less) at a time.
+ *
+ * The scatterlist makes this data structure just over 8KB in size
+ * on 4KB-page platforms. As the size of this structure increases
+ * past one page, it becomes more likely that allocating one of these
+ * will fail. Therefore, these contexts are created on demand, but
+ * cached and reused until the controlling svcxprt_rdma is destroyed.
+ */
+struct svc_rdma_rw_ctxt {
+	struct list_head	rw_list;
+	struct ib_cqe		rw_cqe;
+	struct svcxprt_rdma	*rw_rdma;
+	int			rw_nents;
+	int			rw_wrcount;
+	enum dma_data_direction	rw_dir;
+	struct svc_rdma_op_ctxt	*rw_readctxt;
+	struct rdma_rw_ctx	rw_ctx;
+	struct scatterlist	rw_sg[RPCSVC_MAXPAGES];
+};
+
+static struct svc_rdma_rw_ctxt *
+svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma)
+{
+	struct svc_rdma_rw_ctxt *ctxt;
+
+	svc_xprt_get(&rdma->sc_xprt);
+
+	spin_lock(&rdma->sc_rw_ctxt_lock);
+	if (list_empty(&rdma->sc_rw_ctxts))
+		goto out_empty;
+
+	ctxt = list_first_entry(&rdma->sc_rw_ctxts,
+				struct svc_rdma_rw_ctxt, rw_list);
+	list_del_init(&ctxt->rw_list);
+	spin_unlock(&rdma->sc_rw_ctxt_lock);
+
+out:
+	ctxt->rw_dir = DMA_NONE;
+	return ctxt;
+
+out_empty:
+	spin_unlock(&rdma->sc_rw_ctxt_lock);
+
+	ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
+	if (!ctxt) {
+		svc_xprt_put(&rdma->sc_xprt);
+		return NULL;
+	}
+
+	ctxt->rw_rdma = rdma;
+	INIT_LIST_HEAD(&ctxt->rw_list);
+	sg_init_table(ctxt->rw_sg, ARRAY_SIZE(ctxt->rw_sg));
+	goto out;
+}
+
+static void svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt)
+{
+	struct svcxprt_rdma *rdma = ctxt->rw_rdma;
+
+	if (ctxt->rw_dir != DMA_NONE)
+		rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
+				    rdma->sc_port_num,
+				    ctxt->rw_sg, ctxt->rw_nents,
+				    ctxt->rw_dir);
+
+	spin_lock(&rdma->sc_rw_ctxt_lock);
+	list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
+	spin_unlock(&rdma->sc_rw_ctxt_lock);
+
+	svc_xprt_put(&rdma->sc_xprt);
+}
+
+/**
+ * svc_rdma_destroy_rw_ctxts - Free write contexts
+ * @rdma: xprt about to be destroyed
+ *
+ */
+void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
+{
+	struct svc_rdma_rw_ctxt *ctxt;
+
+	while (!list_empty(&rdma->sc_rw_ctxts)) {
+		ctxt = list_first_entry(&rdma->sc_rw_ctxts,
+					struct svc_rdma_rw_ctxt, rw_list);
+		list_del(&ctxt->rw_list);
+		kfree(ctxt);
+	}
+}
+
+/**
+ * svc_rdma_wc_write_ctx - Handle completion of an RDMA Write ctx
+ * @cq: controlling Completion Queue
+ * @wc: Work Completion
+ *
+ * Invoked in soft IRQ context.
+ *
+ * Assumptions:
+ * - Write completion is not responsible for freeing pages under
+ *   I/O.
+ */
+static void svc_rdma_wc_write_ctx(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct svc_rdma_rw_ctxt *ctxt =
+			container_of(cqe, struct svc_rdma_rw_ctxt, rw_cqe);
+	struct svcxprt_rdma *rdma = ctxt->rw_rdma;
+
+	atomic_add(ctxt->rw_wrcount, &rdma->sc_sq_avail);
+	wake_up(&rdma->sc_send_wait);
+
+	if (wc->status != IB_WC_SUCCESS)
+		goto flush;
+
+out:
+	svc_rdma_put_rw_ctxt(ctxt);
+	return;
+
+flush:
+	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+	if (wc->status != IB_WC_WR_FLUSH_ERR)
+		pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
+		       ib_wc_status_msg(wc->status),
+		       wc->status, wc->vendor_err);
+	goto out;
+}
+
+/**
+ * svc_rdma_wc_read_ctx - Handle completion of an RDMA Read ctx
+ * @cq: controlling Completion Queue
+ * @wc: Work Completion
+ *
+ * Invoked in soft IRQ context.
+ */
+static void svc_rdma_wc_read_ctx(struct ib_cq *cq, struct ib_wc *wc)
+{
+	struct ib_cqe *cqe = wc->wr_cqe;
+	struct svc_rdma_rw_ctxt *ctxt =
+			container_of(cqe, struct svc_rdma_rw_ctxt, rw_cqe);
+	struct svcxprt_rdma *rdma = ctxt->rw_rdma;
+	struct svc_rdma_op_ctxt *head;
+
+	atomic_add(ctxt->rw_wrcount, &rdma->sc_sq_avail);
+	wake_up(&rdma->sc_send_wait);
+
+	if (wc->status != IB_WC_SUCCESS)
+		goto flush;
+
+	head = ctxt->rw_readctxt;
+	if (!head)
+		goto out;
+
+	spin_lock(&rdma->sc_rq_dto_lock);
+	list_add_tail(&head->list, &rdma->sc_read_complete_q);
+	spin_unlock(&rdma->sc_rq_dto_lock);
+	set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+	svc_xprt_enqueue(&rdma->sc_xprt);
+
+out:
+	svc_rdma_put_rw_ctxt(ctxt);
+	return;
+
+flush:
+	set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
+	if (wc->status != IB_WC_WR_FLUSH_ERR)
+		pr_err("svcrdma: read ctx: %s (%u/0x%x)\n",
+		       ib_wc_status_msg(wc->status),
+		       wc->status, wc->vendor_err);
+	goto out;
+}
+
+/* This function sleeps when the transport's Send Queue is congested.
+ *
+ * Assumptions:
+ * - If ib_post_send() succeeds, only one completion is expected,
+ *   even if one or more WRs are flushed. This is true when posting
+ *   an rdma_rw_ctx or when posting a single signaled WR.
+ */
+static int svc_rdma_post_send(struct svcxprt_rdma *rdma,
+			      struct ib_send_wr *first_wr,
+			      int num_wrs)
+{
+	struct svc_xprt *xprt = &rdma->sc_xprt;
+	struct ib_send_wr *bad_wr;
+	int ret;
+
+	do {
+		if ((atomic_sub_return(num_wrs, &rdma->sc_sq_avail) > 0)) {
+			ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
+			if (ret)
+				break;
+			return 0;
+		}
+
+		atomic_inc(&rdma_stat_sq_starve);
+		atomic_add(num_wrs, &rdma->sc_sq_avail);
+		wait_event(rdma->sc_send_wait,
+			   atomic_read(&rdma->sc_sq_avail) > num_wrs);
+	} while (1);
+
+	pr_err("svcrdma: post_send rc=%d; SQ avail=%d/%u\n",
+	       ret, atomic_read(&rdma->sc_sq_avail), rdma->sc_sq_depth);
+	set_bit(XPT_CLOSE, &xprt->xpt_flags);
+
+	/* If even one was posted, there will be a completion. */
+	if (bad_wr != first_wr)
+		return 0;
+
+	atomic_add(num_wrs, &rdma->sc_sq_avail);
+	wake_up(&rdma->sc_send_wait);
+	return -ENOTCONN;
+}
+
+static int svc_rdma_send_write_ctx(struct svcxprt_rdma *rdma,
+				   struct svc_rdma_rw_ctxt *ctxt,
+				   u64 offset, u32 rkey)
+{
+	struct ib_send_wr *first_wr;
+	int ret;
+
+	ret = rdma_rw_ctx_init(&ctxt->rw_ctx,
+			       rdma->sc_qp, rdma->sc_port_num,
+			       ctxt->rw_sg, ctxt->rw_nents,
+			       0, offset, rkey, DMA_TO_DEVICE);
+	if (ret < 0)
+		goto out_init;
+
+	ctxt->rw_wrcount = ret;
+	ctxt->rw_dir = DMA_TO_DEVICE;
+	ctxt->rw_cqe.done = svc_rdma_wc_write_ctx;
+	first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx,
+				   rdma->sc_qp, rdma->sc_port_num,
+				   &ctxt->rw_cqe, NULL);
+	atomic_add(ret, &rdma_stat_write);
+	return svc_rdma_post_send(rdma, first_wr, ret);
+
+out_init:
+	pr_err("svcrdma: rdma_rw_ctx_init failed: %d\n", ret);
+	return -EIO;
+}
+
+/* Common information for sending a Write chunk.
+ *  - Tracks progress of writing one chunk
+ *  - Stores arguments for the SGL constructor function
+ */
+struct svc_rdma_write_info {
+	struct svcxprt_rdma	*wi_rdma;
+
+	/* write state of this chunk */
+	unsigned int		wi_bytes_consumed;
+	unsigned int		wi_seg_off;
+	unsigned int		wi_seg_no;
+	unsigned int		wi_nsegs;
+	__be32			*wi_segs;
+
+	/* SGL constructor arguments */
+	struct xdr_buf		*wi_xdr;
+	unsigned char		*wi_base;
+	unsigned int		wi_next_off;
+};
+
+static void svc_rdma_init_write_info(struct svcxprt_rdma *rdma, __be32 *chunk,
+				     struct svc_rdma_write_info *info)
+{
+	info->wi_rdma = rdma;
+	info->wi_bytes_consumed = 0;
+	info->wi_seg_off = 0;
+	info->wi_seg_no = 0;
+	info->wi_nsegs = be32_to_cpup(chunk + 1);
+	info->wi_segs = chunk + 2;
+}
+
+/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
+ */
+static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
+			       unsigned int len,
+			       struct svc_rdma_rw_ctxt *ctxt)
+{
+	sg_set_buf(&ctxt->rw_sg[0], info->wi_base, len);
+	info->wi_base += len;
+
+	ctxt->rw_nents = 1;
+}
+
+/* Build and DMA-map an SGL that covers the pagelist of an xdr_buf
+ */
+static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
+				    unsigned int remaining,
+				    struct svc_rdma_rw_ctxt *ctxt)
+{
+	unsigned int sge_no, sge_bytes, page_off, page_no;
+	struct scatterlist *sg = ctxt->rw_sg;
+	struct xdr_buf *xdr = info->wi_xdr;
+
+	page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT;
+	page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK;
+	info->wi_next_off += remaining;
+
+	sge_no = 0;
+	do {
+		sge_bytes = min_t(unsigned int, remaining,
+				  PAGE_SIZE - page_off);
+
+		sg_set_page(&sg[sge_no++], xdr->pages[page_no],
+			    sge_bytes, page_off);
+
+		remaining -= sge_bytes;
+		page_no++;
+		page_off = 0;
+	} while (remaining);
+
+	ctxt->rw_nents = sge_no;
+}
+
+/* Post Write WRs to send a portion of an xdr_buf containing
+ * an RPC Reply.
+ */
+static int
+svc_rdma_send_writes(struct svc_rdma_write_info *info,
+		     void (*constructor)(struct svc_rdma_write_info *info,
+					 unsigned int len,
+					 struct svc_rdma_rw_ctxt *ctxt),
+		     unsigned int total)
+{
+	struct svcxprt_rdma *rdma = info->wi_rdma;
+	unsigned int remaining, seg_no, seg_off;
+	struct svc_rdma_rw_ctxt *ctxt;
+	__be32 *seg;
+	int ret;
+
+	if (total == 0)
+		return 0;
+
+	remaining = total;
+	seg_no = info->wi_seg_no;
+	seg_off = info->wi_seg_off;
+	seg = info->wi_segs + seg_no * rpcrdma_segment_maxsz;
+	do {
+		unsigned int write_len;
+		u32 rs_length, rs_handle;
+		u64 rs_offset;
+
+		if (seg_no >= info->wi_nsegs)
+			goto out_overflow;
+
+		ctxt = svc_rdma_get_rw_ctxt(rdma);
+		if (!ctxt)
+			goto out_noctx;
+
+		rs_handle = be32_to_cpu(*seg++);
+		rs_length = be32_to_cpu(*seg++);
+		seg = xdr_decode_hyper(seg, &rs_offset);
+
+		write_len = min(remaining, rs_length - seg_off);
+		constructor(info, write_len, ctxt);
+		ret = svc_rdma_send_write_ctx(rdma, ctxt, rs_offset + seg_off,
+					      rs_handle);
+		if (ret < 0)
+			goto out_senderr;
+
+		if (write_len == rs_length - seg_off) {
+			seg_no++;
+			seg_off = 0;
+		} else {
+			seg_off += write_len;
+		}
+		remaining -= write_len;
+	} while (remaining);
+
+	info->wi_bytes_consumed += total;
+	info->wi_seg_no = seg_no;
+	info->wi_seg_off = seg_off;
+	return 0;
+
+out_overflow:
+	dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
+		info->wi_nsegs);
+	return -E2BIG;
+
+out_noctx:
+	dprintk("svcrdma: no R/W ctxs available\n");
+	return -ENOMEM;
+
+out_senderr:
+	svc_rdma_put_rw_ctxt(ctxt);
+	pr_err("svcrdma: failed to write pagelist (%d)\n", ret);
+	return ret;
+}
+
+/* Send one of an xdr_buf's kvecs by itself. To send a Reply
+ * chunk, the whole RPC Reply is written back to the client.
+ * This function writes either the head or tail of the xdr_buf
+ * containing the Reply.
+ */
+static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
+				  struct kvec *vec)
+{
+	info->wi_base = vec->iov_base;
+
+	return svc_rdma_send_writes(info, svc_rdma_vec_to_sg,
+				    vec->iov_len);
+}
+
+/* Send an xdr_buf's page list by itself. A Write chunk is
+ * just the page list. a Reply chunk is the head, page list,
+ * and tail. This function is shared between the two types
+ * of chunk.
+ */
+static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
+				      struct xdr_buf *xdr)
+{
+	info->wi_xdr = xdr;
+	info->wi_next_off = 0;
+
+	return svc_rdma_send_writes(info, svc_rdma_pagelist_to_sg,
+				    xdr->page_len);
+}
+
+/**
+ * svc_rdma_send_write_list - Write all chunks in the Write list
+ * @rdma: controlling RDMA transport
+ * @wr_ch: Write list provided by client
+ * @rdma_resp: buffer containing transport header under construction
+ * @xdr: xdr_buf carrying an RPC Reply
+ *
+ * Returns:
+ *	%0 if all needed RDMA Writes were posted successfully,
+ *	%-E2BIG if the payload was larger than the Write chunk,
+ *	%-ENOMEM if rdma_rw context pool was exhausted,
+ *	%-ENOTCONN if posting failed (connection is lost),
+ *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ *
+ * Assumptions:
+ *  - Only one Write chunk, and it's the xdr_buf's entire pagelist
+ */
+int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
+			     __be32 *wr_ch, __be32 *rdma_resp,
+			     struct xdr_buf *xdr)
+{
+	struct svc_rdma_write_info info;
+	int ret;
+
+	svc_rdma_init_write_info(rdma, wr_ch, &info);
+
+	ret = svc_rdma_send_xdr_pagelist(&info, xdr);
+
+	svc_rdma_xdr_encode_write_list(rdma_resp, wr_ch,
+				       info.wi_bytes_consumed);
+	return ret;
+}
+
+/**
+ * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
+ * @rdma: controlling RDMA transport
+ * @wr_lst: Write list provided by client
+ * @rp_ch: Reply chunk provided by client
+ * @rdma_resp: buffer containing transport header for Reply
+ * @xdr: xdr_buf carrying an RPC Reply
+ *
+ * Returns:
+ *	%0 if all needed RDMA Writes were posted successfully,
+ *	%-E2BIG if the payload was larger than the Reply chunk,
+ *	%-ENOMEM if rdma_rw context pool was exhausted,
+ *	%-ENOTCONN if posting failed (connection is lost),
+ *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ *
+ * Assumptions:
+ *  - The Reply chunk always carries the whole xdr_buf
+ */
+int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *wr_lst,
+			      __be32 *rp_ch, __be32 *rdma_resp,
+			      struct xdr_buf *xdr)
+{
+	struct svc_rdma_write_info info;
+	int ret;
+
+	svc_rdma_init_write_info(rdma, rp_ch, &info);
+
+	ret = svc_rdma_send_xdr_kvec(&info, &xdr->head[0]);
+	if (ret < 0)
+		goto out;
+
+	/* When Write list entries are present, server has already
+	 * transmitted the pagelist payload via a Write chunk. Thus
+	 * we can skip the pagelist here.
+	 */
+	if (!wr_lst) {
+		ret = svc_rdma_send_xdr_pagelist(&info, xdr);
+		if (ret < 0)
+			goto out;
+	}
+
+	ret = svc_rdma_send_xdr_kvec(&info, &xdr->tail[0]);
+
+out:
+	svc_rdma_xdr_encode_reply_chunk(rdma_resp, rp_ch,
+					info.wi_bytes_consumed);
+	return ret;
+}
+
+/* Pull one Read chunk (segment) from the client.
+ *
+ * Returns zero if one or more RDMA Reads have been posted.  Otherwise,
+ * returns a negative errno if there is a Read list present but RDMA
+ * Reads could not be posted.
+ *
+ * For incoming Reads, @rqstp provides a page list containing sink pages.
+ * As pages are prepared for I/O, they are transferred to @head. After
+ * all Reads in the list have completed, svc_rdma_recvfrom builds an
+ * xdr_buf from the page list in @head.
+ *
+ * On entry, *page_no and *page_offset point into the rqstp's page list.
+ * On return, *page_no and *page_offset are updated to point to the next
+ * position in the page list.
+ */
+static int svc_rdma_recv_read_segment(struct svcxprt_rdma *rdma,
+				      struct svc_rdma_op_ctxt *head,
+				      struct svc_rqst *rqstp,
+				      unsigned int *page_no,
+				      unsigned int *page_offset,
+				      u32 rkey, u32 len, u64 offset,
+				      bool last)
+{
+	struct svc_rdma_rw_ctxt *ctxt;
+	struct ib_send_wr *first_wr;
+	unsigned int pg_no, seg_no;
+	u32 pg_off;
+	int ret;
+
+	dprintk("svcrdma: reading segment %u@0x%016llx:0x%08x\n",
+		len, offset, rkey);
+
+	ctxt = svc_rdma_get_rw_ctxt(rdma);
+	if (!ctxt)
+		return -ENOMEM;
+
+	pg_off = *page_offset;
+	pg_no = *page_no;
+	ctxt->rw_nents = PAGE_ALIGN(*page_offset + len) >> PAGE_SHIFT;
+	for (seg_no = 0; seg_no < ctxt->rw_nents; seg_no++) {
+		unsigned int seg_len = min_t(unsigned int, len,
+					     PAGE_SIZE - pg_off);
+
+		head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
+		head->arg.page_len += seg_len;
+		head->arg.len += seg_len;
+		if (!pg_off)
+			head->count++;
+
+		sg_set_page(&ctxt->rw_sg[seg_no], rqstp->rq_arg.pages[pg_no],
+			    seg_len, pg_off);
+
+		rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no + 1];
+		rqstp->rq_next_page = rqstp->rq_respages + 1;
+
+		pg_off += seg_len;
+		if (pg_off == PAGE_SIZE) {
+			pg_off = 0;
+			pg_no++;
+		}
+		len -= seg_len;
+	}
+
+	ret = rdma_rw_ctx_init(&ctxt->rw_ctx,
+			       rdma->sc_qp, rdma->sc_port_num,
+			       ctxt->rw_sg, ctxt->rw_nents,
+			       0, offset, rkey, DMA_FROM_DEVICE);
+	if (ret < 0)
+		goto out_init;
+
+	ctxt->rw_wrcount = ret;
+	ctxt->rw_dir = DMA_FROM_DEVICE;
+	ctxt->rw_cqe.done = svc_rdma_wc_read_ctx;
+	ctxt->rw_readctxt = last ? head : NULL;
+	first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx,
+				   rdma->sc_qp, rdma->sc_port_num,
+				   &ctxt->rw_cqe, NULL);
+	atomic_add(ret, &rdma_stat_read);
+	ret = svc_rdma_post_send(rdma, first_wr, ret);
+	if (ret)
+		goto out_send;
+
+	*page_no = pg_no;
+	*page_offset = pg_off;
+	return 0;
+
+out_init:
+	pr_err("svcrdma: rdma_rw_ctx_init failed: %d\n", ret);
+	ret = -EIO;
+out_send:
+	svc_rdma_put_rw_ctxt(ctxt);
+	return ret;
+}
+
+/* If there was additional inline content, append it to the end of arg.pages.
+ * Tail copy has to be done after the reader function has determined how many
+ * pages were consumed for RDMA Read.
+ */
+static int svc_rdma_copy_tail(struct svc_rqst *rqstp,
+			      struct svc_rdma_op_ctxt *head, u32 position,
+			      unsigned int page_offset, unsigned int page_no)
+{
+	char *srcp, *destp;
+	u32 byte_count;
+
+	srcp = head->arg.head[0].iov_base + position;
+	byte_count = head->arg.head[0].iov_len - position;
+	if (byte_count > PAGE_SIZE) {
+		dprintk("svcrdma: large tail unsupported\n");
+		return 0;
+	}
+
+	/* Fit as much of the tail on the current page as possible */
+	if (page_offset != PAGE_SIZE) {
+		destp = page_address(rqstp->rq_arg.pages[page_no]);
+		destp += page_offset;
+		while (byte_count--) {
+			*destp++ = *srcp++;
+			page_offset++;
+			if (page_offset == PAGE_SIZE && byte_count)
+				goto more;
+		}
+		goto done;
+	}
+
+more:
+	/* Fit the rest on the next page */
+	page_no++;
+	destp = page_address(rqstp->rq_arg.pages[page_no]);
+	while (byte_count--)
+		*destp++ = *srcp++;
+
+	rqstp->rq_respages = &rqstp->rq_arg.pages[page_no + 1];
+	rqstp->rq_next_page = rqstp->rq_respages + 1;
+
+done:
+	byte_count = head->arg.head[0].iov_len - position;
+	head->arg.page_len += byte_count;
+	head->arg.len += byte_count;
+	head->arg.buflen += byte_count;
+	return 1;
+}
+
+static unsigned int svc_rdma_read_chunk_count(__be32 *p)
+{
+	unsigned int nsegs;
+
+	for (nsegs = 0; *p != xdr_zero; p += rpcrdma_readchunk_maxsz)
+		nsegs++;
+	return nsegs;
+}
+
+/**
+ * svc_rdma_recv_read_list - Pull read chunks from the client
+ * @rdma: controlling RDMA transport
+ * @ch: pointer to Read list in the incoming transport header
+ * @head: pages under I/O collect here
+ * @rqstp: set of pages to use as Read sink buffers
+ *
+ * Returns:
+ *	%0 if there is no Read list,
+ *	%1 if all needed RDMA Reads were posted successfully,
+ *	%-EINVAL if the Read chunk data is too large,
+ *	%-ENOMEM if rdma_rw context pool was exhausted,
+ *	%-ENOTCONN if posting failed (connection is lost),
+ *	%-EIO if rdma_rw initialization failed (DMA mapping, etc).
+ *
+ * Assumptions:
+ * - Clients can send multiple Read chunks in a Read list, but
+ *   the chunks must all have the same value in their Position
+ *   field.
+ */
+int svc_rdma_recv_read_list(struct svcxprt_rdma *rdma, __be32 *ch,
+			    struct svc_rdma_op_ctxt *head,
+			    struct svc_rqst *rqstp)
+{
+	unsigned int page_no, page_offset;
+	u32 position;
+	__be32 *p;
+	bool last;
+	int ret;
+
+	p = ch;
+
+	/* Sanity check */
+	if (svc_rdma_read_chunk_count(p++) > RPCSVC_MAXPAGES)
+		return -EINVAL;
+
+	/* "head" keeps all the pages that comprise the request.
+	 */
+	head->arg.head[0] = rqstp->rq_arg.head[0];
+	head->arg.tail[0] = rqstp->rq_arg.tail[0];
+	head->hdr_count = head->count;
+	head->arg.page_base = 0;
+	head->arg.page_len = 0;
+	head->arg.len = rqstp->rq_arg.len;
+	head->arg.buflen = rqstp->rq_arg.buflen;
+
+	/* RDMA_NOMSG: RDMA Read data should land just after Receive data.
+	 */
+	position = be32_to_cpu(*p++);
+	if (position == 0) {
+		head->arg.pages = &head->pages[0];
+		page_offset = head->byte_len;
+	} else {
+		head->arg.pages = &head->pages[head->count];
+		page_offset = 0;
+	}
+
+	/* This server implementation supports only one Read chunk (of one
+	 * or more segments) per message. The list walk is terminated once
+	 * "position" changes.
+	 */
+	page_no = 0;
+	last = false;
+	while (!last) {
+		u32 rs_handle, rs_length;
+		u64 rs_offset;
+
+		rs_handle = be32_to_cpu(*p++),
+		rs_length = be32_to_cpu(*p++);
+		p = xdr_decode_hyper(p, &rs_offset);
+
+		/* Examine next read segment */
+		if (*p == xdr_zero ||
+		    ((*p != xdr_zero) && (be32_to_cpu(*(p + 1)) != position)))
+			last = true;
+
+		ret = svc_rdma_recv_read_segment(rdma, head, rqstp,
+						 &page_no, &page_offset,
+						 rs_handle, rs_length,
+						 rs_offset, last);
+		if (ret < 0)
+			goto out;
+
+		p += 2;
+	}
+
+	/* Read list may need XDR round-up (see RFC 5666, s. 3.7) */
+	if (page_offset & 3) {
+		u32 pad = 4 - (page_offset & 3);
+
+		head->arg.tail[0].iov_len += pad;
+		head->arg.len += pad;
+		head->arg.buflen += pad;
+		page_offset += pad;
+	}
+
+	ret = 1;
+	if (position && position < head->arg.head[0].iov_len)
+		ret = svc_rdma_copy_tail(rqstp, head, position,
+					 page_offset, page_no);
+	head->arg.head[0].iov_len = position;
+	head->position = position;
+
+ out:
+	/* Detach arg pages. svc_recv will replenish them */
+	for (page_no = 0;
+	     &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
+		rqstp->rq_pages[page_no] = NULL;
+	return ret;
+}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index b4028bc3..e4b8800 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -406,7 +406,7 @@  static int send_write_chunks(struct svcxprt_rdma *xprt,
 		}
 	}
 	/* Update the req with the number of chunks actually used */
-	svc_rdma_xdr_encode_write_list(rdma_resp, chunk_no);
+	svc_rdma_old_encode_write_list(rdma_resp, chunk_no);
 
 	return rqstp->rq_res.page_len;
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index b84cd53..90fabad 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -560,6 +560,7 @@  static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
 	INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
 	INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
+	INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
 	INIT_LIST_HEAD(&cma_xprt->sc_maps);
 	init_waitqueue_head(&cma_xprt->sc_send_wait);
 
@@ -567,6 +568,7 @@  static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
 	spin_lock_init(&cma_xprt->sc_rq_dto_lock);
 	spin_lock_init(&cma_xprt->sc_frmr_q_lock);
 	spin_lock_init(&cma_xprt->sc_ctxt_lock);
+	spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
 	spin_lock_init(&cma_xprt->sc_map_lock);
 
 	/*
@@ -998,6 +1000,7 @@  static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
 		newxprt, newxprt->sc_cm_id);
 
 	dev = newxprt->sc_cm_id->device;
+	newxprt->sc_port_num = newxprt->sc_cm_id->port_num;
 
 	/* Qualify the transport resource defaults with the
 	 * capabilities of this particular device */
@@ -1247,6 +1250,7 @@  static void __svc_rdma_free(struct work_struct *work)
 	}
 
 	rdma_dealloc_frmr_q(rdma);
+	svc_rdma_destroy_rw_ctxts(rdma);
 	svc_rdma_destroy_ctxts(rdma);
 	svc_rdma_destroy_maps(rdma);