[RFC,9/9] svcrdma: Add data structure to track READ payloads
diff mbox series

Message ID 20200214155029.3848.86626.stgit@klimt.1015granger.net
State New
Headers show
Series
  • Address bugzilla 198053 and more ...
Related show

Commit Message

Chuck Lever Feb. 14, 2020, 3:50 p.m. UTC
The Linux NFS/RDMA server implementation currently supports only a
single Write chunk per RPC/RDMA request. Requests with more than one
are so rare there has never been a strong need to support more.
However we are aware of at least one existing NFS client
implementation that can generate such requests, so let's dig in.

Allocate a data structure at Receive time to keep track of the set
of READ payloads and the Write chunks.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/svc_rdma.h            |   15 +++-
 net/sunrpc/xprtrdma/svc_rdma_backchannel.c |    2 -
 net/sunrpc/xprtrdma/svc_rdma_recvfrom.c    |   31 +++++++--
 net/sunrpc/xprtrdma/svc_rdma_rw.c          |    2 -
 net/sunrpc/xprtrdma/svc_rdma_sendto.c      |   94 +++++++++++++---------------
 5 files changed, 80 insertions(+), 64 deletions(-)

Patch
diff mbox series

diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index c1c4563066d9..85e6b281a39b 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -124,6 +124,12 @@  enum {
 
 #define RPCSVC_MAXPAYLOAD_RDMA	RPCSVC_MAXPAYLOAD
 
+struct svc_rdma_payload {
+	__be32			*ra_chunk;
+	unsigned int		ra_offset;
+	unsigned int		ra_length;
+};
+
 struct svc_rdma_recv_ctxt {
 	struct llist_node	rc_node;
 	struct list_head	rc_list;
@@ -137,10 +143,10 @@  struct svc_rdma_recv_ctxt {
 	unsigned int		rc_page_count;
 	unsigned int		rc_hdr_count;
 	u32			rc_inv_rkey;
-	__be32			*rc_write_list;
+	struct svc_rdma_payload	*rc_read_payloads;
 	__be32			*rc_reply_chunk;
-	unsigned int		rc_read_payload_offset;
-	unsigned int		rc_read_payload_length;
+	unsigned int		rc_num_write_chunks;
+	unsigned int		rc_cur_payload;
 	struct page		*rc_pages[RPCSVC_MAXPAGES];
 };
 
@@ -193,7 +199,8 @@  extern void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
 				    unsigned int len);
 extern int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 				  struct svc_rdma_send_ctxt *ctxt,
-				  struct xdr_buf *xdr, __be32 *wr_lst);
+				  struct xdr_buf *xdr,
+				  unsigned int num_read_payloads);
 extern int svc_rdma_sendto(struct svc_rqst *);
 extern int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
 				 unsigned int length);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
index 908e78bb87c6..3b1baf15a1b7 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_backchannel.c
@@ -117,7 +117,7 @@  static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
 {
 	int ret;
 
-	ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, NULL);
+	ret = svc_rdma_map_reply_msg(rdma, ctxt, &rqst->rq_snd_buf, 0);
 	if (ret < 0)
 		return -EIO;
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 91abe08f7d75..85b8dd8ae772 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -193,7 +193,9 @@  void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
 
 out:
 	ctxt->rc_page_count = 0;
-	ctxt->rc_read_payload_length = 0;
+	ctxt->rc_num_write_chunks = 0;
+	ctxt->rc_cur_payload = 0;
+	ctxt->rc_read_payloads = NULL;
 	return ctxt;
 
 out_empty:
@@ -216,7 +218,8 @@  void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
 
 	for (i = 0; i < ctxt->rc_page_count; i++)
 		put_page(ctxt->rc_pages[i]);
-
+	kfree(ctxt->rc_read_payloads);
+	ctxt->rc_read_payloads = NULL;
 	if (!ctxt->rc_temp)
 		llist_add(&ctxt->rc_node, &rdma->sc_recv_ctxts);
 	else
@@ -452,9 +455,10 @@  static __be32 *xdr_check_write_chunk(__be32 *p, const __be32 *end,
 static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end,
 				    struct svc_rdma_recv_ctxt *ctxt)
 {
-	u32 chcount;
+	u32 chcount, segcount;
+	__be32 *saved = p;
+	int i;
 
-	ctxt->rc_write_list = p;
 	chcount = 0;
 	while (*p++ != xdr_zero) {
 		p = xdr_check_write_chunk(p, end, MAX_BYTES_WRITE_SEG);
@@ -463,8 +467,22 @@  static __be32 *xdr_check_write_list(__be32 *p, const __be32 *end,
 		if (chcount++ > 1)
 			return NULL;
 	}
+	ctxt->rc_num_write_chunks = chcount;
 	if (!chcount)
-		ctxt->rc_write_list = NULL;
+		return p;
+
+	ctxt->rc_read_payloads = kcalloc(sizeof(struct svc_rdma_payload),
+					 chcount, GFP_KERNEL);
+	if (!ctxt->rc_read_payloads)
+		return NULL;
+
+	i = 0;
+	p = saved;
+	while (*p++ != xdr_zero) {
+		ctxt->rc_read_payloads[i++].ra_chunk = p - 1;
+		segcount = be32_to_cpup(p++);
+		p += segcount * rpcrdma_segment_maxsz;
+	}
 	return p;
 }
 
@@ -484,8 +502,9 @@  static __be32 *xdr_check_reply_chunk(__be32 *p, const __be32 *end,
 		p = xdr_check_write_chunk(p, end, MAX_BYTES_SPECIAL_SEG);
 		if (!p)
 			return NULL;
-	} else
+	} else {
 		ctxt->rc_reply_chunk = NULL;
+	}
 	return p;
 }
 
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
index ca9d414bef9d..740ea4ee251d 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_rw.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -574,7 +574,7 @@  int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
 	/* Send the page list in the Reply chunk only if the
 	 * client did not provide Write chunks.
 	 */
-	if (!rctxt->rc_write_list && xdr->page_len) {
+	if (!rctxt->rc_num_write_chunks && xdr->page_len) {
 		ret = svc_rdma_send_xdr_pagelist(info, xdr,
 						 xdr->head[0].iov_len,
 						 xdr->page_len);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 7349a3f9aa5d..378a24b666bb 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -366,10 +366,10 @@  static __be32 *xdr_encode_read_list(__be32 *p)
  * transport header. Each segment's length field is updated to
  * reflect number of bytes consumed in the segment.
  *
- * Returns number of segments in this chunk.
+ * Returns a pointer to the position to encode the next chunk.
  */
-static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
-					   unsigned int remaining)
+static __be32 *xdr_encode_write_chunk(__be32 *dst, __be32 *src,
+				      unsigned int length)
 {
 	unsigned int i, nsegs;
 	u32 seg_len;
@@ -386,15 +386,15 @@  static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
 		*dst++ = *src++;
 
 		/* bytes returned in this segment */
-		seg_len = be32_to_cpu(*src);
-		if (remaining >= seg_len) {
+		seg_len = be32_to_cpup(src);
+		if (length >= seg_len) {
 			/* entire segment was consumed */
 			*dst = *src;
-			remaining -= seg_len;
+			length -= seg_len;
 		} else {
 			/* segment only partly filled */
-			*dst = cpu_to_be32(remaining);
-			remaining = 0;
+			*dst = cpu_to_be32(length);
+			length = 0;
 		}
 		dst++; src++;
 
@@ -403,38 +403,25 @@  static unsigned int xdr_encode_write_chunk(__be32 *dst, __be32 *src,
 		*dst++ = *src++;
 	}
 
-	return nsegs;
+	return dst;
 }
 
-/* The client provided a Write list in the Call message. Fill in
- * the segments in the first Write chunk in the Reply's transport
- * header with the number of bytes consumed in each segment.
- * Remaining chunks are returned unused.
- *
- * Assumptions:
- *  - Client has provided only one Write chunk
+/* The client provided a Write list in the Call message. For each
+ * READ payload, fill in the segments in the Write chunks in the
+ * Reply's transport header with the number of bytes consumed
+ * in each segment. Any remaining Write chunks are returned to
+ * the client unused.
  */
 static __be32 *xdr_encode_write_list(__be32 *p,
 				     const struct svc_rdma_recv_ctxt *rctxt)
 {
-	unsigned int consumed, nsegs;
-	__be32 *q;
-
-	q = rctxt->rc_write_list;
-	if (!q)
-		goto out;
-
-	consumed = rctxt->rc_read_payload_length;
-	while (*q != xdr_zero) {
-		nsegs = xdr_encode_write_chunk(p, q, consumed);
-		q += 2 + nsegs * rpcrdma_segment_maxsz;
-		p += 2 + nsegs * rpcrdma_segment_maxsz;
-		consumed = 0;
-	}
+	unsigned int i;
 
-	/* Terminate Write list */
-out:
-	*p++ = xdr_zero;
+	for (i = 0; i < rctxt->rc_num_write_chunks; i++)
+		p = xdr_encode_write_chunk(p,
+					rctxt->rc_read_payloads[i].ra_chunk,
+					rctxt->rc_read_payloads[i].ra_length);
+	*p++ = xdr_zero;	/* Terminate Write list */
 	return p;
 }
 
@@ -519,7 +506,7 @@  void svc_rdma_sync_reply_hdr(struct svcxprt_rdma *rdma,
 static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
 				    struct svc_rdma_send_ctxt *ctxt,
 				    struct xdr_buf *xdr,
-				    __be32 *wr_lst)
+				    unsigned int num_write_chunks)
 {
 	int elements;
 
@@ -535,7 +522,7 @@  static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
 	elements = 1;
 
 	/* xdr->pages */
-	if (!wr_lst) {
+	if (!num_write_chunks) {
 		unsigned int remaining;
 		unsigned long pageoff;
 
@@ -563,7 +550,8 @@  static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
  */
 static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
 				      struct svc_rdma_send_ctxt *ctxt,
-				      struct xdr_buf *xdr, __be32 *wr_lst)
+				      struct xdr_buf *xdr,
+				      unsigned int num_write_chunks)
 {
 	unsigned char *dst, *tailbase;
 	unsigned int taillen;
@@ -576,7 +564,7 @@  static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
 
 	tailbase = xdr->tail[0].iov_base;
 	taillen = xdr->tail[0].iov_len;
-	if (wr_lst) {
+	if (num_write_chunks) {
 		u32 xdrpad;
 
 		xdrpad = xdr_padsize(xdr->page_len);
@@ -619,7 +607,7 @@  static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
  * @rdma: controlling transport
  * @ctxt: send_ctxt for the Send WR
  * @xdr: prepared xdr_buf containing RPC message
- * @wr_lst: pointer to Call header's Write list, or NULL
+ * @num_read_payloads: count of separate READ payloads to send
  *
  * Load the xdr_buf into the ctxt's sge array, and DMA map each
  * element as it is added.
@@ -628,7 +616,7 @@  static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
  */
 int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 			   struct svc_rdma_send_ctxt *ctxt,
-			   struct xdr_buf *xdr, __be32 *wr_lst)
+			   struct xdr_buf *xdr, unsigned int num_read_payloads)
 {
 	unsigned int len, remaining;
 	unsigned long page_off;
@@ -637,8 +625,8 @@  int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 	u32 xdr_pad;
 	int ret;
 
-	if (svc_rdma_pull_up_needed(rdma, ctxt, xdr, wr_lst))
-		return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, wr_lst);
+	if (svc_rdma_pull_up_needed(rdma, ctxt, xdr, num_read_payloads))
+		return svc_rdma_pull_up_reply_msg(rdma, ctxt, xdr, num_read_payloads);
 
 	++ctxt->sc_cur_sge_no;
 	ret = svc_rdma_dma_map_buf(rdma, ctxt,
@@ -647,12 +635,12 @@  int svc_rdma_map_reply_msg(struct svcxprt_rdma *rdma,
 	if (ret < 0)
 		return ret;
 
-	/* If a Write chunk is present, the xdr_buf's page list
+	/* If Write chunks are present, the xdr_buf's page list
 	 * is not included inline. However the Upper Layer may
 	 * have added XDR padding in the tail buffer, and that
 	 * should not be included inline.
 	 */
-	if (wr_lst) {
+	if (num_read_payloads) {
 		base = xdr->tail[0].iov_base;
 		len = xdr->tail[0].iov_len;
 		xdr_pad = xdr_padsize(xdr->page_len);
@@ -741,7 +729,7 @@  static int svc_rdma_send_reply_msg(struct svcxprt_rdma *rdma,
 	if (!rctxt->rc_reply_chunk) {
 		ret = svc_rdma_map_reply_msg(rdma, sctxt,
 					     &rqstp->rq_res,
-					     rctxt->rc_write_list);
+					     rctxt->rc_cur_payload);
 		if (ret < 0)
 			return ret;
 	}
@@ -885,18 +873,20 @@  int svc_rdma_read_payload(struct svc_rqst *rqstp, unsigned int offset,
 {
 	struct svc_rdma_recv_ctxt *rctxt = rqstp->rq_xprt_ctxt;
 	struct svcxprt_rdma *rdma;
+	unsigned int i;
 
-	if (!rctxt->rc_write_list)
+	if (!rctxt->rc_num_write_chunks)
 		return 0;
 
-	/* XXX: Just one READ payload slot for now, since our
-	 * transport implementation currently supports only one
-	 * Write chunk.
-	 */
-	rctxt->rc_read_payload_offset = offset;
-	rctxt->rc_read_payload_length = length;
+	if (rctxt->rc_cur_payload > rctxt->rc_num_write_chunks)
+		return -ENOENT;
+	i = rctxt->rc_cur_payload++;
+
+	rctxt->rc_read_payloads[i].ra_offset = offset;
+	rctxt->rc_read_payloads[i].ra_length = length;
 
 	rdma = container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
-	return svc_rdma_send_write_chunk(rdma, rctxt->rc_write_list,
+	return svc_rdma_send_write_chunk(rdma,
+					 rctxt->rc_read_payloads[i].ra_chunk,
 					 &rqstp->rq_res, offset, length);
 }