diff mbox series

[RFC,09/11] svcrdma: Support multiple READ payloads when pulling up

Message ID 20200319152110.16298.9209.stgit@klimt.1015granger.net (mailing list archive)
State RFC
Headers show
Series Linux NFS server support for multiple Write chunks | expand

Commit Message

Chuck Lever III March 19, 2020, 3:21 p.m. UTC
When counting the number of SGEs needed to construct a Send request,
do not count READ payloads. And, when copying the Reply message into
the pull-up buffer, READ payloads are not to be copied to the Send
buffer.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/trace/events/rpcrdma.h        |   15 ++-
 net/sunrpc/xprtrdma/svc_rdma_sendto.c |  186 ++++++++++++++++++++-------------
 2 files changed, 121 insertions(+), 80 deletions(-)
diff mbox series

Patch

diff --git a/include/trace/events/rpcrdma.h b/include/trace/events/rpcrdma.h
index 9238d233f8cf..ff2d943d1540 100644
--- a/include/trace/events/rpcrdma.h
+++ b/include/trace/events/rpcrdma.h
@@ -1641,20 +1641,25 @@  TRACE_EVENT(svcrdma_dma_map_rwctx,
 
 TRACE_EVENT(svcrdma_send_pullup,
 	TP_PROTO(
-		unsigned int len
+		unsigned int hdrlen,
+		unsigned int msglen
 	),
 
-	TP_ARGS(len),
+	TP_ARGS(hdrlen, msglen),
 
 	TP_STRUCT__entry(
-		__field(unsigned int, len)
+		__field(unsigned int, hdrlen)
+		__field(unsigned int, msglen)
 	),
 
 	TP_fast_assign(
-		__entry->len = len;
+		__entry->hdrlen = hdrlen;
+		__entry->msglen = msglen;
 	),
 
-	TP_printk("len=%u", __entry->len)
+	TP_printk("hdr=%u msg=%u (total %u)",
+		__entry->hdrlen, __entry->msglen,
+		__entry->hdrlen + __entry->msglen)
 );
 
 TRACE_EVENT(svcrdma_send_failed,
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 85c91d0debb4..037be0bdb557 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -622,6 +622,45 @@  static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
 				     offset_in_page(base), len);
 }
 
+struct svc_rdma_pullup_data {
+	u8		*pd_dest;
+	unsigned int	pd_length;
+	unsigned int	pd_num_sges;
+};
+
+/**
+ * svc_rdma_xb_count_sges - Count how many SGEs will be needed
+ * @xdr: xdr_buf containing portion of an RPC message to transmit
+ * @data: pointer to arguments
+ *
+ * Returns:
+ *   Number of SGEs needed to Send the contents of @xdr inline
+ */
+static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr,
+				  void *data)
+{
+	struct svc_rdma_pullup_data *args = data;
+	unsigned int remaining;
+	unsigned long offset;
+
+	if (xdr->head[0].iov_len)
+		++args->pd_num_sges;
+
+	offset = offset_in_page(xdr->page_base);
+	remaining = xdr->page_len;
+	while (remaining) {
+		++args->pd_num_sges;
+		remaining -= min_t(u32, PAGE_SIZE - offset, remaining);
+		offset = 0;
+	}
+
+	if (xdr->tail[0].iov_len)
+		++args->pd_num_sges;
+
+	args->pd_length += xdr->len;
+	return 0;
+}
+
 /**
  * svc_rdma_pull_up_needed - Determine whether to use pull-up
  * @rdma: controlling transport
@@ -630,50 +669,70 @@  static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
  * @xdr: xdr_buf containing RPC message to transmit
  *
  * Returns:
- *	%true if pull-up must be used
- *	%false otherwise
+ *   %true if pull-up must be used
+ *   %false otherwise
  */
-static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
-				    struct svc_rdma_send_ctxt *sctxt,
+static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma,
+				    const struct svc_rdma_send_ctxt *sctxt,
 				    const struct svc_rdma_recv_ctxt *rctxt,
-				    struct xdr_buf *xdr)
+				    const struct xdr_buf *xdr)
 {
-	bool read_payload_present = rctxt && rctxt->rc_cur_payload;
-	int elements;
+	/* Resources needed for the transport header */
+	struct svc_rdma_pullup_data args = {
+		.pd_length	= sctxt->sc_hdrbuf.len,
+		.pd_num_sges	= 1,
+	};
+	int ret;
 
-	/* For small messages, copying bytes is cheaper than DMA mapping.
-	 */
-	if (!read_payload_present &&
-	    sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
+	ret = svc_rdma_skip_payloads(xdr, rctxt, svc_rdma_xb_count_sges,
+				     &args);
+	if (ret < 0)
+		return false;
+
+	if (args.pd_length < RPCRDMA_PULLUP_THRESH)
 		return true;
+	return args.pd_num_sges >= rdma->sc_max_send_sges;
+}
 
-	/* Check whether the xdr_buf has more elements than can
-	 * fit in a single RDMA Send.
-	 */
-	/* xdr->head */
-	elements = 1;
-
-	/* xdr->pages */
-	if (!read_payload_present) {
-		unsigned int remaining;
-		unsigned long pageoff;
-
-		pageoff = xdr->page_base & ~PAGE_MASK;
-		remaining = xdr->page_len;
-		while (remaining) {
-			++elements;
-			remaining -= min_t(u32, PAGE_SIZE - pageoff,
-					   remaining);
-			pageoff = 0;
-		}
+/**
+ * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer
+ * @xdr: xdr_buf containing portion of an RPC message to copy
+ * @data: pointer to arguments
+ *
+ * Returns:
+ *   Always zero.
+ */
+static int svc_rdma_xb_linearize(const struct xdr_buf *xdr,
+				 void *data)
+{
+	struct svc_rdma_pullup_data *args = data;
+	unsigned int len, remaining;
+	unsigned long pageoff;
+	struct page **ppages;
+
+	if (xdr->head[0].iov_len) {
+		memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len);
+		args->pd_dest += xdr->head[0].iov_len;
 	}
 
-	/* xdr->tail */
-	if (xdr->tail[0].iov_len)
-		++elements;
+	ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+	pageoff = offset_in_page(xdr->page_base);
+	remaining = xdr->page_len;
+	while (remaining) {
+		len = min_t(u32, PAGE_SIZE - pageoff, remaining);
+		memcpy(args->pd_dest, page_address(*ppages), len);
+		remaining -= len;
+		args->pd_dest += len;
+		pageoff = 0;
+	}
+
+	if (xdr->tail[0].iov_len) {
+		memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
+		args->pd_dest += xdr->tail[0].iov_len;
+	}
 
-	/* assume 1 SGE is needed for the transport header */
-	return elements >= rdma->sc_max_send_sges;
+	args->pd_length += xdr->len;
+	return 0;
 }
 
 /**
@@ -686,53 +745,30 @@  static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
  * The device is not capable of sending the reply directly.
  * Assemble the elements of @xdr into the transport header buffer.
  *
- * Returns zero on success, or a negative errno on failure.
+ * Assumptions:
+ *  pull_up_needed has determined that @xdr will fit in the buffer.
+ *
+ * Returns:
+ *   %0 if pull-up was successful
+ *   %-EMSGSIZE if a buffer manipulation problem occurred
  */
-static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
+static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
 				      struct svc_rdma_send_ctxt *sctxt,
 				      const struct svc_rdma_recv_ctxt *rctxt,
 				      const struct xdr_buf *xdr)
 {
-	unsigned char *dst, *tailbase;
-	unsigned int taillen;
-
-	dst = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len;
-	memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
-	dst += xdr->head[0].iov_len;
-
-	tailbase = xdr->tail[0].iov_base;
-	taillen = xdr->tail[0].iov_len;
-	if (rctxt && rctxt->rc_cur_payload) {
-		u32 xdrpad;
-
-		xdrpad = xdr_pad_size(xdr->page_len);
-		if (taillen && xdrpad) {
-			tailbase += xdrpad;
-			taillen -= xdrpad;
-		}
-	} else {
-		unsigned int len, remaining;
-		unsigned long pageoff;
-		struct page **ppages;
-
-		ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
-		pageoff = xdr->page_base & ~PAGE_MASK;
-		remaining = xdr->page_len;
-		while (remaining) {
-			len = min_t(u32, PAGE_SIZE - pageoff, remaining);
-
-			memcpy(dst, page_address(*ppages), len);
-			remaining -= len;
-			dst += len;
-			pageoff = 0;
-		}
-	}
+	struct svc_rdma_pullup_data args = {
+		.pd_dest	= sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len,
+	};
+	int ret;
 
-	if (taillen)
-		memcpy(dst, tailbase, taillen);
+	ret = svc_rdma_skip_payloads(xdr, rctxt, svc_rdma_xb_linearize,
+				     &args);
+	if (ret < 0)
+		return ret;
 
-	sctxt->sc_sges[0].length += xdr->len;
-	trace_svcrdma_send_pullup(sctxt->sc_sges[0].length);
+	sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length;
+	trace_svcrdma_send_pullup(sctxt->sc_hdrbuf.len, args.pd_length);
 	return 0;
 }