@@ -124,6 +124,12 @@ enum {
#define RPCSVC_MAXPAYLOAD_RDMA RPCSVC_MAXPAYLOAD
+struct svc_rdma_payload {
+ __be32 *ra_chunk;
+ unsigned int ra_offset;
+ unsigned int ra_length;
+};
+
struct svc_rdma_recv_ctxt {
struct llist_node rc_node;
struct list_head rc_list;
@@ -137,10 +143,10 @@ struct svc_rdma_recv_ctxt {
unsigned int rc_page_count;
unsigned int rc_hdr_count;
u32 rc_inv_rkey;
- __be32 *rc_write_list;
+ struct svc_rdma_payload *rc_read_payloads;
__be32 *rc_reply_chunk;
- unsigned int rc_read_payload_offset;
- unsigned int rc_read_payload_length;
+ unsigned int rc_num_write_chunks;
+ unsigned int rc_cur_payload;
struct page *rc_pages[RPCSVC_MAXPAGES];
};
@@ -193,7 +199,8 @@ extern void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
unsigned int len);
extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
- struct xdr_buf *xdr, __be32 *wr_lst);
+ struct xdr_buf *xdr,
+ unsigned int num_read_payloads);
extern int svc_rdma_sendto(struct svc_rqst *);
extern int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
unsigned int length);
@@ -117,7 +117,7 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
{
int ret;
- ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, NULL);
+ ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, 0);
if (ret < 0)
return -EIO;
@@ -193,7 +193,9 @@ void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
out:
ctxt->rc_page_count = 0;
- ctxt->rc_read_payload_length = 0;
+ ctxt->rc_num_write_chunks = 0;
+ ctxt->rc_cur_payload = 0;
+ ctxt->rc_read_payloads = NULL;
return ctxt;
out_empty:
@@ -216,7 +218,8 @@ void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
for (i = 0; i < ctxt->rc_page_count; i++)
put_page(ctxt->rc_pages[i]);
-
+ kfree(ctxt->rc_read_payloads);
+ ctxt->rc_read_payloads = NULL;
if (!ctxt->rc_temp)
llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
else
@@ -452,9 +455,10 @@ static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end,
static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end,
struct svc_rdma_recv_ctxt *ctxt)
{
- u32 chcount;
+ u32 chcount, segcount;
+ __be32 *saved = p;
+ int i;
- ctxt->rc_write_list = p;
chcount = 0;
while (*p++ != xdr_zero) {
p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG);
@@ -463,8 +467,22 @@ static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end,
if (chcount++ > 1)
return NULL;
}
+ ctxt->rc_num_write_chunks = chcount;
if (!chcount)
- ctxt->rc_write_list = NULL;
+ return p;
+
+ ctxt->rc_read_payloads = kcalloc(sizeof(struct svc_rdma_payload),
+ chcount, GFP_KERNEL);
+ if (!ctxt->rc_read_payloads)
+ return NULL;
+
+ i = 0;
+ p = saved;
+ while (*p++ != xdr_zero) {
+ ctxt->rc_read_payloads[i++].ra_chunk = p - 1;
+ segcount = be32_to_cpup(p++);
+ p += segcount * rpcrdma_segment_maxsz;
+ }
return p;
}
@@ -484,8 +502,9 @@ static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end,
p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG);
if (!p)
return NULL;
- } else
+ } else {
ctxt->rc_reply_chunk = NULL;
+ }
return p;
}
@@ -574,7 +574,7 @@ int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
/* Send the page list in the Reply chunk only if the
* client did not provide Write chunks.
*/
- if (!rctxt->rc_write_list && xdr->page_len) {
+ if (!rctxt->rc_num_write_chunks && xdr->page_len) {
ret = svc_rdma_send_xdr_pagelist(info, xdr,
xdr->head[0].iov_len,
xdr->page_len);
@@ -366,10 +366,10 @@ static __be32 *xdr_encode_read_list(__be32 *p)
* transport header. Each segment's length field is updated to
* reflect number of bytes consumed in the segment.
*
- * Returns number of segments in this chunk.
+ * Returns a pointer to the position to encode the next chunk.
*/
-static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
- unsigned int remaining)
+static __be32 *xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+ unsigned int length)
{
unsigned int i, nsegs;
u32 seg_len;
@@ -386,15 +386,15 @@ static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
*dst++ = *src++;
/* bytes returned in this segment */
- seg_len = be32_to_cpu(*src);
- if (remaining >= seg_len) {
+ seg_len = be32_to_cpup(src);
+ if (length >= seg_len) {
/* entire segment was consumed */
*dst = *src;
- remaining -= seg_len;
+ length -= seg_len;
} else {
/* segment only partly filled */
- *dst = cpu_to_be32(remaining);
- remaining = 0;
+ *dst = cpu_to_be32(length);
+ length = 0;
}
dst++; src++;
@@ -403,38 +403,25 @@ static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
*dst++ = *src++;
}
- return nsegs;
+ return dst;
}
-/* The client provided a Write list in the Call message. Fill in
- * the segments in the first Write chunk in the Reply's transport
- * header with the number of bytes consumed in each segment.
- * Remaining chunks are returned unused.
- *
- * Assumptions:
- * - Client has provided only one Write chunk
+/* The client provided a Write list in the Call message. For each
+ * READ payload, fill in the segments in the Write chunks in the
+ * Reply's transport header with the number of bytes consumed
+ * in each segment. Any remaining Write chunks are returned to
+ * the client unused.
*/
static __be32 *xdr_encode_write_list(__be32 *p,
const struct svc_rdma_recv_ctxt *rctxt)
{
- unsigned int consumed, nsegs;
- __be32 *q;
-
- q = rctxt->rc_write_list;
- if (!q)
- goto out;
-
- consumed = rctxt->rc_read_payload_length;
- while (*q != xdr_zero) {
- nsegs = xdr_encode_write_chunk(p, q, consumed);
- q += 2 + nsegs * rpcrdma_segment_maxsz;
- p += 2 + nsegs * rpcrdma_segment_maxsz;
- consumed = 0;
- }
+ unsigned int i;
- /* Terminate Write list */
-out:
- *p++ = xdr_zero;
+ for (i = 0; i < rctxt->rc_num_write_chunks; i++)
+ p = xdr_encode_write_chunk(p,
+ rctxt->rc_read_payloads[i].ra_chunk,
+ rctxt->rc_read_payloads[i].ra_length);
+ *p++ = xdr_zero; /* Terminate Write list */
return p;
}
@@ -519,7 +506,7 @@ void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
struct xdr_buf *xdr,
- __be32 *wr_lst)
+ unsigned int num_write_chunks)
{
int elements;
@@ -535,7 +522,7 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
elements = 1;
/* xdr->pages */
- if (!wr_lst) {
+ if (!num_write_chunks) {
unsigned int remaining;
unsigned long pageoff;
@@ -563,7 +550,8 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
*/
static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
- struct xdr_buf *xdr, __be32 *wr_lst)
+ struct xdr_buf *xdr,
+ unsigned int num_write_chunks)
{
unsigned char *dst, *tailbase;
unsigned int taillen;
@@ -576,7 +564,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
tailbase = xdr->tail[0].iov_base;
taillen = xdr->tail[0].iov_len;
- if (wr_lst) {
+ if (num_write_chunks) {
u32 xdrpad;
xdrpad = xdr_padsize(xdr->page_len);
@@ -619,7 +607,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
* @rdma: controlling transport
* @ctxt: send_ctxt for the Send WR
* @xdr: prepared xdr_buf containing RPC message
- * @wr_lst: pointer to Call header's Write list, or NULL
+ * @num_read_payloads: count of separate READ payloads to send
*
* Load the xdr_buf into the ctxt's sge array, and DMA map each
* element as it is added.
@@ -628,7 +616,7 @@ static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
*/
int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt,
- struct xdr_buf *xdr, __be32 *wr_lst)
+ struct xdr_buf *xdr, unsigned int num_read_payloads)
{
unsigned int len, remaining;
unsigned long page_off;
@@ -637,8 +625,8 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
u32 xdr_pad;
int ret;
- if (svc_rdma_pull_up_needed(rdma, ctxt, xdr, wr_lst))
- return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst);
+ if (svc_rdma_pull_up_needed(rdma, ctxt, xdr, num_read_payloads))
+ return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, num_read_payloads);
++ctxt->sc_cur_sge_no;
ret = svc_rdma_dma_map_buf(rdma, ctxt,
@@ -647,12 +635,12 @@ int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
if (ret < 0)
return ret;
- /* If a Write chunk is present, the xdr_buf's page list
+ /* If Write chunks are present, the xdr_buf's page list
* is not included inline. However the Upper Layer may
* have added XDR padding in the tail buffer, and that
* should not be included inline.
*/
- if (wr_lst) {
+ if (num_read_payloads) {
base = xdr->tail[0].iov_base;
len = xdr->tail[0].iov_len;
xdr_pad = xdr_padsize(xdr->page_len);
@@ -741,7 +729,7 @@ static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
if (!rctxt->rc_reply_chunk) {
ret = svc_rdma_map_reply_msg(rdma, sctxt,
&rqstp->rq_res,
- rctxt->rc_write_list);
+ rctxt->rc_cur_payload);
if (ret < 0)
return ret;
}
@@ -885,18 +873,20 @@ int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
{
struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
struct svcxprt_rdma *rdma;
+ unsigned int i;
- if (!rctxt->rc_write_list)
+ if (!rctxt->rc_num_write_chunks)
return 0;
- /* XXX: Just one READ payload slot for now, since our
- * transport implementation currently supports only one
- * Write chunk.
- */
- rctxt->rc_read_payload_offset = offset;
- rctxt->rc_read_payload_length = length;
+ if (rctxt->rc_cur_payload > rctxt->rc_num_write_chunks)
+ return -ENOENT;
+ i = rctxt->rc_cur_payload++;
+
+ rctxt->rc_read_payloads[i].ra_offset = offset;
+ rctxt->rc_read_payloads[i].ra_length = length;
rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
- return svc_rdma_send_write_chunk(rdma, rctxt->rc_write_list,
+ return svc_rdma_send_write_chunk(rdma,
+ rctxt->rc_read_payloads[i].ra_chunk,
&rqstp->rq_res, offset, length);
}
The Linux NFS/RDMA server implementation currently supports only a single Write chunk per RPC/RDMA request. Requests with more than one are so rare there has never been a strong need to support more. However we are aware of at least one existing NFS client implementation that can generate such requests, so let's dig in. Allocate a data structure at Receive time to keep track of the set of READ payloads and the Write chunks. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> --- include/linux/sunrpc/svc_rdma.h | 15 +++- net/sunrpc/xprtrdma/svc_rdma_backchannel.c | 2 - net/sunrpc/xprtrdma/svc_rdma_recvfrom.c | 31 +++++++-- net/sunrpc/xprtrdma/svc_rdma_rw.c | 2 - net/sunrpc/xprtrdma/svc_rdma_sendto.c | 94 +++++++++++++--------------- 5 files changed, 80 insertions(+), 64 deletions(-)