diff mbox

[v2,6/7] xprtrdma: Add class for RDMA backwards direction transport

Message ID 20151130222513.13029.52447.stgit@klimt.1015granger.net (mailing list archive)
State New, archived
Headers show

Commit Message

Chuck Lever Nov. 30, 2015, 10:25 p.m. UTC
To support the server-side of an NFSv4.1 backchannel on RDMA
connections, add a transport class that enables backward
direction messages on an existing forward channel connection.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 include/linux/sunrpc/xprt.h              |    1 
 net/sunrpc/xprt.c                        |    1 
 net/sunrpc/xprtrdma/svc_rdma_transport.c |   14 +-
 net/sunrpc/xprtrdma/transport.c          |  230 ++++++++++++++++++++++++++++++
 net/sunrpc/xprtrdma/xprt_rdma.h          |    2 
 5 files changed, 243 insertions(+), 5 deletions(-)


--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

Tom Talpey Dec. 1, 2015, 1:38 p.m. UTC | #1
On 11/30/2015 5:25 PM, Chuck Lever wrote:
> To support the server-side of an NFSv4.1 backchannel on RDMA
> connections, add a transport class that enables backward
> direction messages on an existing forward channel connection.
>

> +static void *
> +xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
> +{
> +	struct rpc_rqst *rqst = task->tk_rqstp;
> +	struct svc_rdma_op_ctxt *ctxt;
> +	struct svcxprt_rdma *rdma;
> +	struct svc_xprt *sxprt;
> +	struct page *page;
> +
> +	if (size > PAGE_SIZE) {
> +		WARN_ONCE(1, "failed to handle buffer allocation (size %zu)\n",
> +			  size);

You may want to add more context to that rather cryptic string, at
least the function name.

Also, it's not exactly "failed to handle", it's an invalid size. Why
would this ever happen? Why even log it?



> +static int
> +rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
> +{
...
> +
> +drop_connection:
> +	dprintk("Failed to send backchannel call\n");

Ditto on the prefix / function context.

And also...

> +	dprintk("%s: sending request with xid: %08x\n",
> +		__func__, be32_to_cpu(rqst->rq_xid));
...
> +	dprintk("RPC:       %s: xprt %p\n", __func__, xprt);

The format strings for many of the dprintk's are somewhat inconsistent.
Some start with "RPC", some with the function name, and some (in other
patches of this series) with "svcrdma". Confusing, and perhaps hard to
pick out of the log.


--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Chuck Lever Dec. 1, 2015, 2:36 p.m. UTC | #2
> On Dec 1, 2015, at 8:38 AM, Tom Talpey <tom@talpey.com> wrote:
> 
> On 11/30/2015 5:25 PM, Chuck Lever wrote:
>> To support the server-side of an NFSv4.1 backchannel on RDMA
>> connections, add a transport class that enables backward
>> direction messages on an existing forward channel connection.
>> 
> 
>> +static void *
>> +xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
>> +{
>> +	struct rpc_rqst *rqst = task->tk_rqstp;
>> +	struct svc_rdma_op_ctxt *ctxt;
>> +	struct svcxprt_rdma *rdma;
>> +	struct svc_xprt *sxprt;
>> +	struct page *page;
>> +
>> +	if (size > PAGE_SIZE) {
>> +		WARN_ONCE(1, "failed to handle buffer allocation (size %zu)\n",
>> +			  size);
> 
> You may want to add more context to that rather cryptic string, at
> least the function name.
> 
> Also, it's not exactly "failed to handle", it's an invalid size. Why
> would this ever happen? Why even log it?
> 
> 
> 
>> +static int
>> +rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
>> +{
> ...
>> +
>> +drop_connection:
>> +	dprintk("Failed to send backchannel call\n");
> 
> Ditto on the prefix / function context.
> 
> And also...
> 
>> +	dprintk("%s: sending request with xid: %08x\n",
>> +		__func__, be32_to_cpu(rqst->rq_xid));
> ...
>> +	dprintk("RPC:       %s: xprt %p\n", __func__, xprt);
> 
> The format strings for many of the dprintk's are somewhat inconsistent.
> Some start with "RPC", some with the function name, and some (in other
> patches of this series) with "svcrdma". Confusing, and perhaps hard to
> pick out of the log.

They do follow a convention: “RPC:      “ is used on the client
side when there is no rpc_task::tk_pid available. “svcrdma” is
used on the server everywhere.

The dprintk changes here were a bit cursory, so I will go back and
review them more carefully.

--
Chuck Lever




--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index 69ef5b3..7637ccd 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -85,6 +85,7 @@  struct rpc_rqst {
 	__u32 *			rq_buffer;	/* XDR encode buffer */
 	size_t			rq_callsize,
 				rq_rcvsize;
+	void *			rq_privdata; /* xprt-specific per-rqst data */
 	size_t			rq_xmit_bytes_sent;	/* total bytes sent */
 	size_t			rq_reply_bytes_recvd;	/* total reply bytes */
 							/* received */
diff --git a/net/sunrpc/xprt.c b/net/sunrpc/xprt.c
index 2e98f4a..37edea6 100644
--- a/net/sunrpc/xprt.c
+++ b/net/sunrpc/xprt.c
@@ -1425,3 +1425,4 @@  void xprt_put(struct rpc_xprt *xprt)
 	if (atomic_dec_and_test(&xprt->count))
 		xprt_destroy(xprt);
 }
+EXPORT_SYMBOL_GPL(xprt_put);
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 643402e..ab5e376 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -1172,12 +1172,14 @@  static void __svc_rdma_free(struct work_struct *work)
 {
 	struct svcxprt_rdma *rdma =
 		container_of(work, struct svcxprt_rdma, sc_work);
-	dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
+	struct svc_xprt *xprt = &rdma->sc_xprt;
+
+	dprintk("svcrdma: %s(%p)\n", __func__, rdma);
 
 	/* We should only be called from kref_put */
-	if (atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0)
+	if (atomic_read(&xprt->xpt_ref.refcount) != 0)
 		pr_err("svcrdma: sc_xprt still in use? (%d)\n",
-		       atomic_read(&rdma->sc_xprt.xpt_ref.refcount));
+		       atomic_read(&xprt->xpt_ref.refcount));
 
 	/*
 	 * Destroy queued, but not processed read completions. Note
@@ -1212,6 +1214,12 @@  static void __svc_rdma_free(struct work_struct *work)
 		pr_err("svcrdma: dma still in use? (%d)\n",
 		       atomic_read(&rdma->sc_dma_used));
 
+	/* Final put of backchannel client transport */
+	if (xprt->xpt_bc_xprt) {
+		xprt_put(xprt->xpt_bc_xprt);
+		xprt->xpt_bc_xprt = NULL;
+	}
+
 	/* De-allocate fastreg mr */
 	rdma_dealloc_frmr_q(rdma);
 
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 8c545f7..db1fd1f 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -51,6 +51,7 @@ 
 #include <linux/slab.h>
 #include <linux/seq_file.h>
 #include <linux/sunrpc/addr.h>
+#include <linux/sunrpc/svc_rdma.h>
 
 #include "xprt_rdma.h"
 
@@ -148,7 +149,8 @@  static struct ctl_table sunrpc_table[] = {
 #define RPCRDMA_MAX_REEST_TO	(30U * HZ)
 #define RPCRDMA_IDLE_DISC_TO	(5U * 60 * HZ)
 
-static struct rpc_xprt_ops xprt_rdma_procs;	/* forward reference */
+static struct rpc_xprt_ops xprt_rdma_procs;
+static struct rpc_xprt_ops xprt_rdma_bc_procs;
 
 static void
 xprt_rdma_format_addresses4(struct rpc_xprt *xprt, struct sockaddr *sap)
@@ -499,7 +501,7 @@  xprt_rdma_allocate(struct rpc_task *task, size_t size)
 	if (req == NULL)
 		return NULL;
 
-	flags = GFP_NOIO | __GFP_NOWARN;
+	flags = RPCRDMA_DEF_GFP;
 	if (RPC_IS_SWAPPER(task))
 		flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
 
@@ -684,6 +686,195 @@  xprt_rdma_disable_swap(struct rpc_xprt *xprt)
 {
 }
 
+/* Server-side transport endpoint wants a whole page for its send
+ * buffer. The client RPC code constructs the RPC header in this
+ * buffer before it invokes ->send_request.
+ */
+static void *
+xprt_rdma_bc_allocate(struct rpc_task *task, size_t size)
+{
+	struct rpc_rqst *rqst = task->tk_rqstp;
+	struct svc_rdma_op_ctxt *ctxt;
+	struct svcxprt_rdma *rdma;
+	struct svc_xprt *sxprt;
+	struct page *page;
+
+	if (size > PAGE_SIZE) {
+		WARN_ONCE(1, "failed to handle buffer allocation (size %zu)\n",
+			  size);
+		return NULL;
+	}
+
+	page = alloc_page(RPCRDMA_DEF_GFP);
+	if (!page)
+		return NULL;
+
+	sxprt = rqst->rq_xprt->bc_xprt;
+	rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
+	ctxt = svc_rdma_get_context_gfp(rdma, RPCRDMA_DEF_GFP);
+	if (!ctxt) {
+		put_page(page);
+		return NULL;
+	}
+
+	rqst->rq_privdata = ctxt;
+	ctxt->pages[0] = page;
+	ctxt->count = 1;
+	return page_address(page);
+}
+
+static void
+xprt_rdma_bc_free(void *buffer)
+{
+	/* No-op: ctxt and page have already been freed. */
+}
+
+static int
+rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
+{
+	struct rpc_xprt *xprt = rqst->rq_xprt;
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_msg *headerp = (struct rpcrdma_msg *)rqst->rq_buffer;
+	struct svc_rdma_op_ctxt *ctxt;
+	int rc;
+
+	/* Space in the send buffer for an RPC/RDMA header is reserved
+	 * via xprt->tsh_size */
+	headerp->rm_xid = rqst->rq_xid;
+	headerp->rm_vers = rpcrdma_version;
+	headerp->rm_credit = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
+	headerp->rm_type = rdma_msg;
+	headerp->rm_body.rm_chunks[0] = xdr_zero;
+	headerp->rm_body.rm_chunks[1] = xdr_zero;
+	headerp->rm_body.rm_chunks[2] = xdr_zero;
+
+	dprintk("%s: %*ph\n", __func__, 64, rqst->rq_buffer);
+
+	ctxt = (struct svc_rdma_op_ctxt *)rqst->rq_privdata;
+	rc = svc_rdma_bc_post_send(rdma, ctxt, &rqst->rq_snd_buf);
+	if (rc)
+		goto drop_connection;
+	return rc;
+
+drop_connection:
+	dprintk("Failed to send backchannel call\n");
+	svc_rdma_put_context(ctxt, 1);
+	xprt_disconnect_done(xprt);
+	return -ENOTCONN;
+}
+
+/* Send an RPC call on the passive end of a transport
+ * connection.
+ */
+static int
+xprt_rdma_bc_send_request(struct rpc_task *task)
+{
+	struct rpc_rqst *rqst = task->tk_rqstp;
+	struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
+	struct svcxprt_rdma *rdma;
+	u32 len;
+
+	dprintk("%s: sending request with xid: %08x\n",
+		__func__, be32_to_cpu(rqst->rq_xid));
+
+	if (!mutex_trylock(&sxprt->xpt_mutex)) {
+		rpc_sleep_on(&sxprt->xpt_bc_pending, task, NULL);
+		if (!mutex_trylock(&sxprt->xpt_mutex))
+			return -EAGAIN;
+		rpc_wake_up_queued_task(&sxprt->xpt_bc_pending, task);
+	}
+
+	len = -ENOTCONN;
+	rdma = container_of(sxprt, struct svcxprt_rdma, sc_xprt);
+	if (!test_bit(XPT_DEAD, &sxprt->xpt_flags))
+		len = rpcrdma_bc_send_request(rdma, rqst);
+
+	mutex_unlock(&sxprt->xpt_mutex);
+
+	if (len < 0)
+		return len;
+	return 0;
+}
+
+static void
+xprt_rdma_bc_close(struct rpc_xprt *xprt)
+{
+	dprintk("RPC:       %s: xprt %p\n", __func__, xprt);
+}
+
+static void
+xprt_rdma_bc_put(struct rpc_xprt *xprt)
+{
+	dprintk("RPC:       %s: xprt %p\n", __func__, xprt);
+
+	xprt_free(xprt);
+	module_put(THIS_MODULE);
+}
+
+/* It shouldn't matter if the number of backchannel session slots
+ * doesn't match the number of RPC/RDMA credits. That just means
+ * one or the other will have extra slots that aren't used.
+ */
+static struct rpc_xprt *
+xprt_setup_rdma_bc(struct xprt_create *args)
+{
+	struct rpc_xprt *xprt;
+	struct rpcrdma_xprt *new_xprt;
+
+	if (args->addrlen > sizeof(xprt->addr)) {
+		dprintk("RPC:       %s: address too large\n", __func__);
+		return ERR_PTR(-EBADF);
+	}
+
+	xprt = xprt_alloc(args->net, sizeof(*new_xprt),
+			  RPCRDMA_MAX_BC_REQUESTS,
+			  RPCRDMA_MAX_BC_REQUESTS);
+	if (xprt == NULL) {
+		dprintk("RPC:       %s: couldn't allocate rpc_xprt\n",
+			__func__);
+		return ERR_PTR(-ENOMEM);
+	}
+
+	xprt->timeout = &xprt_rdma_default_timeout;
+	xprt_set_bound(xprt);
+	xprt_set_connected(xprt);
+	xprt->bind_timeout = RPCRDMA_BIND_TO;
+	xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
+	xprt->idle_timeout = RPCRDMA_IDLE_DISC_TO;
+
+	xprt->prot = XPRT_TRANSPORT_BC_RDMA;
+	xprt->tsh_size = RPCRDMA_HDRLEN_MIN / sizeof(__be32);
+	xprt->ops = &xprt_rdma_bc_procs;
+
+	memcpy(&xprt->addr, args->dstaddr, args->addrlen);
+	xprt->addrlen = args->addrlen;
+	xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
+	xprt->resvport = 0;
+
+	xprt->max_payload = xprt_rdma_max_inline_read;
+
+	new_xprt = rpcx_to_rdmax(xprt);
+	new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
+
+	xprt_get(xprt);
+	args->bc_xprt->xpt_bc_xprt = xprt;
+	xprt->bc_xprt = args->bc_xprt;
+
+	if (!try_module_get(THIS_MODULE))
+		goto out_fail;
+
+	/* Final put for backchannel xprt is in __svc_rdma_free */
+	xprt_get(xprt);
+	return xprt;
+
+out_fail:
+	xprt_rdma_free_addresses(xprt);
+	args->bc_xprt->xpt_bc_xprt = NULL;
+	xprt_put(xprt);
+	xprt_free(xprt);
+	return ERR_PTR(-EINVAL);
+}
+
 /*
  * Plumbing for rpc transport switch and kernel module
  */
@@ -722,6 +913,28 @@  static struct xprt_class xprt_rdma = {
 	.setup			= xprt_setup_rdma,
 };
 
+static struct rpc_xprt_ops xprt_rdma_bc_procs = {
+	.reserve_xprt		= xprt_reserve_xprt_cong,
+	.release_xprt		= xprt_release_xprt_cong,
+	.alloc_slot		= xprt_alloc_slot,
+	.release_request	= xprt_release_rqst_cong,
+	.buf_alloc		= xprt_rdma_bc_allocate,
+	.buf_free		= xprt_rdma_bc_free,
+	.send_request		= xprt_rdma_bc_send_request,
+	.set_retrans_timeout	= xprt_set_retrans_timeout_def,
+	.close			= xprt_rdma_bc_close,
+	.destroy		= xprt_rdma_bc_put,
+	.print_stats		= xprt_rdma_print_stats
+};
+
+static struct xprt_class xprt_rdma_bc = {
+	.list			= LIST_HEAD_INIT(xprt_rdma_bc.list),
+	.name			= "rdma backchannel",
+	.owner			= THIS_MODULE,
+	.ident			= XPRT_TRANSPORT_BC_RDMA,
+	.setup			= xprt_setup_rdma_bc,
+};
+
 void xprt_rdma_cleanup(void)
 {
 	int rc;
@@ -740,6 +953,11 @@  void xprt_rdma_cleanup(void)
 
 	rpcrdma_destroy_wq();
 	frwr_destroy_recovery_wq();
+
+	rc = xprt_unregister_transport(&xprt_rdma_bc);
+	if (rc)
+		dprintk("RPC:       %s: xprt_unregister(bc) returned %i\n",
+			__func__, rc);
 }
 
 int xprt_rdma_init(void)
@@ -763,6 +981,14 @@  int xprt_rdma_init(void)
 		return rc;
 	}
 
+	rc = xprt_register_transport(&xprt_rdma_bc);
+	if (rc) {
+		xprt_unregister_transport(&xprt_rdma);
+		rpcrdma_destroy_wq();
+		frwr_destroy_recovery_wq();
+		return rc;
+	}
+
 	dprintk("RPCRDMA Module Init, register RPC RDMA transport\n");
 
 	dprintk("Defaults:\n");
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 9aeff2b..485027e 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -148,6 +148,8 @@  rdmab_to_msg(struct rpcrdma_regbuf *rb)
 	return (struct rpcrdma_msg *)rb->rg_base;
 }
 
+#define RPCRDMA_DEF_GFP		(GFP_NOIO | __GFP_NOWARN)
+
 /*
  * struct rpcrdma_rep -- this structure encapsulates state required to recv
  * and complete a reply, asychronously. It needs several pieces of