diff mbox series

[v2,09/19] xprtrdma: Wake RPCs directly in rpcrdma_wc_send path

Message ID 20190611150846.2877.62136.stgit@manet.1015granger.net (mailing list archive)
State New, archived
Headers show
Series for-5.3 patches for review | expand

Commit Message

Chuck Lever June 11, 2019, 3:08 p.m. UTC
Eliminate a context switch in the path that handles RPC wake-ups
when a Receive completion has to wait for a Send completion.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 net/sunrpc/xprtrdma/rpc_rdma.c  |   61 +++++++++++++++------------------------
 net/sunrpc/xprtrdma/transport.c |   10 ++++++
 net/sunrpc/xprtrdma/verbs.c     |    3 +-
 net/sunrpc/xprtrdma/xprt_rdma.h |   12 ++------
 4 files changed, 36 insertions(+), 50 deletions(-)
diff mbox series

Patch

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 33b6e6a..caf0b19 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -511,6 +511,16 @@  static bool rpcrdma_results_inline(struct rpcrdma_xprt *r_xprt,
 	return 0;
 }
 
+static void rpcrdma_sendctx_done(struct kref *kref)
+{
+	struct rpcrdma_req *req =
+		container_of(kref, struct rpcrdma_req, rl_kref);
+	struct rpcrdma_rep *rep = req->rl_reply;
+
+	rpcrdma_complete_rqst(rep);
+	rep->rr_rxprt->rx_stats.reply_waits_for_send++;
+}
+
 /**
  * rpcrdma_sendctx_unmap - DMA-unmap Send buffer
  * @sc: sendctx containing SGEs to unmap
@@ -520,6 +530,9 @@  void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 {
 	struct ib_sge *sge;
 
+	if (!sc->sc_unmap_count)
+		return;
+
 	/* The first two SGEs contain the transport header and
 	 * the inline buffer. These are always left mapped so
 	 * they can be cheaply re-used.
@@ -529,9 +542,7 @@  void rpcrdma_sendctx_unmap(struct rpcrdma_sendctx *sc)
 		ib_dma_unmap_page(sc->sc_device, sge->addr, sge->length,
 				  DMA_TO_DEVICE);
 
-	if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES,
-			       &sc->sc_req->rl_flags))
-		wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
+	kref_put(&sc->sc_req->rl_kref, rpcrdma_sendctx_done);
 }
 
 /* Prepare an SGE for the RPC-over-RDMA transport header.
@@ -666,7 +677,7 @@  static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
 out:
 	sc->sc_wr.num_sge += sge_no;
 	if (sc->sc_unmap_count)
-		__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+		kref_get(&req->rl_kref);
 	return true;
 
 out_regbuf:
@@ -708,7 +719,7 @@  static bool rpcrdma_prepare_msg_sges(struct rpcrdma_xprt *r_xprt,
 	req->rl_sendctx->sc_wr.num_sge = 0;
 	req->rl_sendctx->sc_unmap_count = 0;
 	req->rl_sendctx->sc_req = req;
-	__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
+	kref_init(&req->rl_kref);
 
 	ret = -EIO;
 	if (!rpcrdma_prepare_hdr_sge(r_xprt, req, hdrlen))
@@ -1268,36 +1279,12 @@  void rpcrdma_complete_rqst(struct rpcrdma_rep *rep)
 	goto out;
 }
 
-/* Ensure that any DMA mapped pages associated with
- * the Send of the RPC Call have been unmapped before
- * allowing the RPC to complete. This protects argument
- * memory not controlled by the RPC client from being
- * re-used before we're done with it.
- */
-static void rpcrdma_release_tx(struct rpcrdma_xprt *r_xprt,
-			       struct rpcrdma_req *req)
+static void rpcrdma_reply_done(struct kref *kref)
 {
-	if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
-		r_xprt->rx_stats.reply_waits_for_send++;
-		out_of_line_wait_on_bit(&req->rl_flags,
-					RPCRDMA_REQ_F_TX_RESOURCES,
-					bit_wait,
-					TASK_UNINTERRUPTIBLE);
-	}
-}
+	struct rpcrdma_req *req =
+		container_of(kref, struct rpcrdma_req, rl_kref);
 
-/**
- * rpcrdma_release_rqst - Release hardware resources
- * @r_xprt: controlling transport instance
- * @req: request with resources to release
- *
- */
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
-{
-	if (!list_empty(&req->rl_registered))
-		frwr_unmap_sync(r_xprt, req);
-
-	rpcrdma_release_tx(r_xprt, req);
+	rpcrdma_complete_rqst(req->rl_reply);
 }
 
 /**
@@ -1367,13 +1354,11 @@  void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 
 	if (rep->rr_wc_flags & IB_WC_WITH_INVALIDATE)
 		frwr_reminv(rep, &req->rl_registered);
-	if (!list_empty(&req->rl_registered)) {
+	if (!list_empty(&req->rl_registered))
 		frwr_unmap_async(r_xprt, req);
 		/* LocalInv completion will complete the RPC */
-	} else {
-		rpcrdma_release_tx(r_xprt, req);
-		rpcrdma_complete_rqst(rep);
-	}
+	else
+		kref_put(&req->rl_kref, rpcrdma_reply_done);
 	return;
 
 out_badversion:
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index f84375d..9575f1d 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -618,8 +618,16 @@  static bool rpcrdma_check_regbuf(struct rpcrdma_xprt *r_xprt,
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 
-	rpcrdma_release_rqst(r_xprt, req);
 	trace_xprtrdma_op_free(task, req);
+
+	if (!list_empty(&req->rl_registered))
+		frwr_unmap_sync(r_xprt, req);
+
+	/* XXX: If the RPC is completing because of a signal and
+	 * not because a reply was received, we ought to ensure
+	 * that the Send completion has fired, so that memory
+	 * involved with the Send is not still visible to the NIC.
+	 */
 }
 
 /**
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index c50a4b2..4e22cc2 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -1462,8 +1462,7 @@  static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
 	struct ib_send_wr *send_wr = &req->rl_sendctx->sc_wr;
 	int rc;
 
-	if (!ep->rep_send_count ||
-	    test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
+	if (!ep->rep_send_count || kref_read(&req->rl_kref) > 1) {
 		send_wr->send_flags |= IB_SEND_SIGNALED;
 		ep->rep_send_count = ep->rep_send_batch;
 	} else {
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index e465221..5475f0d 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -44,7 +44,8 @@ 
 
 #include <linux/wait.h> 		/* wait_queue_head_t, etc */
 #include <linux/spinlock.h> 		/* spinlock_t, etc */
-#include <linux/atomic.h>			/* atomic_t, etc */
+#include <linux/atomic.h>		/* atomic_t, etc */
+#include <linux/kref.h>			/* struct kref */
 #include <linux/workqueue.h>		/* struct work_struct */
 
 #include <rdma/rdma_cm.h>		/* RDMA connection api */
@@ -329,17 +330,12 @@  struct rpcrdma_req {
 	struct rpcrdma_regbuf	*rl_recvbuf;	/* rq_rcv_buf */
 
 	struct list_head	rl_all;
-	unsigned long		rl_flags;
+	struct kref		rl_kref;
 
 	struct list_head	rl_registered;	/* registered segments */
 	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
 };
 
-/* rl_flags */
-enum {
-	RPCRDMA_REQ_F_TX_RESOURCES,
-};
-
 static inline struct rpcrdma_req *
 rpcr_to_rdmar(const struct rpc_rqst *rqst)
 {
@@ -584,8 +580,6 @@  int rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
 void rpcrdma_set_max_header_sizes(struct rpcrdma_xprt *);
 void rpcrdma_complete_rqst(struct rpcrdma_rep *rep);
 void rpcrdma_reply_handler(struct rpcrdma_rep *rep);
-void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt,
-			  struct rpcrdma_req *req);
 
 static inline void rpcrdma_set_xdrlen(struct xdr_buf *xdr, size_t len)
 {