@@ -145,9 +145,10 @@ struct svc_rdma_recv_ctxt {
unsigned int rc_page_count;
unsigned int rc_hdr_count;
u32 rc_inv_rkey;
- struct svc_rdma_payload rc_read_payload;
+ struct svc_rdma_payload *rc_read_payloads;
__be32 *rc_reply_chunk;
unsigned int rc_num_write_chunks;
+ unsigned int rc_cur_payload;
struct page *rc_pages[RPCSVC_MAXPAGES];
};
@@ -193,8 +193,9 @@ svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
out:
ctxt->rc_page_count = 0;
- ctxt->rc_read_payload.rp_length = 0;
ctxt->rc_num_write_chunks = 0;
+ ctxt->rc_cur_payload = 0;
+ ctxt->rc_read_payloads = NULL;
return ctxt;
out_empty:
@@ -217,7 +218,8 @@ void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
for (i = 0; i < ctxt->rc_page_count; i++)
put_page(ctxt->rc_pages[i]);
-
+ kfree(ctxt->rc_read_payloads);
+ ctxt->rc_read_payloads = NULL;
if (!ctxt->rc_temp)
llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
else
@@ -474,13 +476,13 @@ static bool xdr_check_write_chunk(struct svc_rdma_recv_ctxt *rctxt, u32 maxlen)
*/
static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt)
{
- u32 chcount = 0;
- __be32 *p;
+ u32 i, segcount, chcount = 0;
+ __be32 *p, *saved;
p = xdr_inline_decode(&rctxt->rc_stream, sizeof(*p));
if (!p)
return false;
- rctxt->rc_read_payload.rp_chunk = p;
+ saved = p;
while (*p != xdr_zero) {
if (!xdr_check_write_chunk(rctxt, MAX_BYTES_WRITE_CHUNK))
return false;
@@ -491,8 +493,22 @@ static bool xdr_check_write_list(struct svc_rdma_recv_ctxt *rctxt)
}
rctxt->rc_num_write_chunks = chcount;
if (!chcount)
- rctxt->rc_read_payload.rp_chunk = NULL;
- return chcount < 2;
+ return true;
+
+ rctxt->rc_read_payloads = kcalloc(chcount,
+ sizeof(struct svc_rdma_payload),
+ GFP_KERNEL);
+ if (!rctxt->rc_read_payloads)
+ return false;
+
+ i = 0;
+ p = saved;
+ while (*p != xdr_zero) {
+ rctxt->rc_read_payloads[i++].rp_chunk = p++;
+ segcount = be32_to_cpup(p++);
+ p += segcount * rpcrdma_segment_maxsz;
+ }
+ return true;
}
/* Sanity check the Reply chunk.
@@ -625,7 +625,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
/* Send the page list in the Reply chunk only if the
* client did not provide Write chunks.
*/
- if (!rctxt->rc_num_write_chunks && xdr->page_len) {
+ if (!rctxt->rc_cur_payload && xdr->page_len) {
ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
xdr->page_len);
if (ret < 0)
@@ -447,10 +447,11 @@ static ssize_t svc_rdma_encode_write_chunk(struct svc_rdma_send_ctxt *sctxt,
* @rctxt: Reply context with information about the RPC Call
* @sctxt: Send context for the RPC Reply
*
- * The client provides a Write chunk list in the Call message. Fill
- * in the segments in the first Write chunk in the Reply's transport
- * header with the number of bytes consumed in each segment.
- * Remaining chunks are returned unused.
+ * The client provided a Write list in the Call message. For each
+ * READ payload, fill in the segments in the Write chunks in the
+ * Reply's transport header with the number of bytes consumed
+ * in each segment. Any remaining Write chunks are returned to
+ * the client unused.
*
* Assumptions:
* - Client has provided only one Write chunk
@@ -465,11 +466,12 @@ svc_rdma_encode_write_list(const struct svc_rdma_recv_ctxt *rctxt,
struct svc_rdma_send_ctxt *sctxt)
{
ssize_t len, ret;
+ unsigned int i;
len = 0;
- if (rctxt->rc_num_write_chunks) {
+ for (i = 0; i < rctxt->rc_num_write_chunks; i++) {
ret = svc_rdma_encode_write_chunk(sctxt,
- &rctxt->rc_read_payload);
+ &rctxt->rc_read_payloads[i]);
if (ret < 0)
return ret;
len += ret;
@@ -564,7 +566,7 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
const struct svc_rdma_recv_ctxt *rctxt,
struct xdr_buf *xdr)
{
- bool read_payload_present = rctxt && rctxt->rc_num_write_chunks;
+ bool read_payload_present = rctxt && rctxt->rc_cur_payload;
int elements;
/* For small messages, copying bytes is cheaper than DMA mapping.
@@ -628,7 +630,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
tailbase = xdr->tail[0].iov_base;
taillen = xdr->tail[0].iov_len;
- if (rctxt && rctxt->rc_num_write_chunks) {
+ if (rctxt && rctxt->rc_cur_payload) {
u32 xdrpad;
xdrpad = xdr_pad_size(xdr->page_len);
@@ -708,12 +710,12 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
if (ret < 0)
return ret;
- /* If a Write chunk is present, the xdr_buf's page list
+ /* If Write chunks are present, the xdr_buf's page list
* is not included inline. However the Upper Layer may
* have added XDR padding in the tail buffer, and that
* should not be included inline.
*/
- if (rctxt && rctxt->rc_num_write_chunks) {
+ if (rctxt && rctxt->rc_cur_payload) {
base = xdr->tail[0].iov_base;
len = xdr->tail[0].iov_len;
xdr_pad = xdr_pad_size(xdr->page_len);
@@ -951,21 +953,23 @@ int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
struct xdr_buf uninitialized_var(subbuf);
struct svcxprt_rdma *rdma;
+ unsigned int i;
if (!rctxt->rc_num_write_chunks || !length)
return 0;
- /* XXX: Just one READ payload slot for now, since our
- * transport implementation currently supports only one
- * Write chunk.
- */
- rctxt->rc_read_payload.rp_offset = offset;
- rctxt->rc_read_payload.rp_length = length;
+ if (rctxt->rc_cur_payload > rctxt->rc_num_write_chunks)
+ return -ENOENT;
+ i = rctxt->rc_cur_payload++;
+
+ rctxt->rc_read_payloads[i].rp_offset = offset;
+ rctxt->rc_read_payloads[i].rp_length = length;
if (xdr_buf_subsegment(&rqstp->rq_res, &subbuf, offset, length))
return -EMSGSIZE;
rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
- return svc_rdma_send_write_chunk(rdma, rctxt->rc_read_payload.rp_chunk,
+ return svc_rdma_send_write_chunk(rdma,
+ rctxt->rc_read_payloads[i].rp_chunk,
&subbuf);
}
The Linux NFS/RDMA server implementation currently supports only a single Write chunk per RPC/RDMA request. Requests with more than one are so rare there has never been a strong need to support more. However we are aware of at least one existing NFS client implementation that can generate such requests, so let's dig in. Allocate a data structure at Receive time to keep track of the set of READ payloads and the Write chunks. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> --- include/linux/sunrpc/svc_rdma.h | 3 ++ net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 30 +++++++++++++++++++----- net/sunrpc/xprtrdma/svc_rdma_rw.c | 2 +- net/sunrpc/xprtrdma/svc_rdma_sendto.c | 38 +++++++++++++++++-------------- 4 files changed, 47 insertions(+), 26 deletions(-)