diff mbox

[v4,24/24] xprtrdma: Disconnect on registration failure

Message ID 20140522005740.27190.63869.stgit@manet.1015granger.net (mailing list archive)
State Superseded, archived
Headers show

Commit Message

Chuck Lever May 22, 2014, 12:57 a.m. UTC
If rpcrdma_register_external() fails during request marshaling, the
current RPC request is killed. Instead, this RPC should be retried
after reconnecting the transport instance.

The most likely reason for registration failure with FRMR is a
failed post_send, which would be due to a remote transport
disconnect or memory exhaustion. These issues can be recovered
by a retry.

Problems encountered in the marshaling logic itself will not be
corrected by trying again, so these should still kill a request.

Now that we've added a clean exit for marshaling errors, take the
opportunity to defang some BUG_ON's.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---

 net/sunrpc/xprtrdma/rpc_rdma.c  |   48 +++++++++++++++++++++++++--------------
 net/sunrpc/xprtrdma/transport.c |   17 +++++++++-----
 2 files changed, 42 insertions(+), 23 deletions(-)


--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 82173c7..e6a01e1 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -77,6 +77,8 @@  static const char transfertypes[][12] = {
  * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
  * elements. Segments are then coalesced when registered, if possible
  * within the selected memreg mode.
+ *
+ * Returns positive number of segments converted, or a negative errno.
  */
 
 static int
@@ -103,12 +105,13 @@  rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 			/* alloc the pagelist for receiving buffer */
 			ppages[p] = alloc_page(GFP_ATOMIC);
 			if (!ppages[p])
-				return 0;
+				return -ENOMEM;
 		}
 		seg[n].mr_page = ppages[p];
 		seg[n].mr_offset = (void *)(unsigned long) page_base;
 		seg[n].mr_len = min_t(u32, PAGE_SIZE - page_base, len);
-		BUG_ON(seg[n].mr_len > PAGE_SIZE);
+		if (seg[n].mr_len > PAGE_SIZE)
+			return -EIO;
 		len -= seg[n].mr_len;
 		++n;
 		++p;
@@ -117,7 +120,7 @@  rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 
 	/* Message overflows the seg array */
 	if (len && n == nsegs)
-		return 0;
+		return -EIO;
 
 	if (xdrbuf->tail[0].iov_len) {
 		/* the rpcrdma protocol allows us to omit any trailing
@@ -126,7 +129,7 @@  rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
 			return n;
 		if (n == nsegs)
 			/* Tail remains, but we're out of segments */
-			return 0;
+			return -EIO;
 		seg[n].mr_page = NULL;
 		seg[n].mr_offset = xdrbuf->tail[0].iov_base;
 		seg[n].mr_len = xdrbuf->tail[0].iov_len;
@@ -167,15 +170,17 @@  rpcrdma_convert_iovs(struct xdr_buf *xdrbuf, unsigned int pos,
  *  Reply chunk (a counted array):
  *   N elements:
  *    1 - N - HLOO - HLOO - ... - HLOO
+ *
+ * Returns positive RPC/RDMA header size, or negative errno.
  */
 
-static unsigned int
+static ssize_t
 rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
 		struct rpcrdma_msg *headerp, enum rpcrdma_chunktype type)
 {
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
-	int nsegs, nchunks = 0;
+	int n, nsegs, nchunks = 0;
 	unsigned int pos;
 	struct rpcrdma_mr_seg *seg = req->rl_segments;
 	struct rpcrdma_read_chunk *cur_rchunk = NULL;
@@ -201,11 +206,11 @@  rpcrdma_create_chunks(struct rpc_rqst *rqst, struct xdr_buf *target,
 		pos = target->head[0].iov_len;
 
 	nsegs = rpcrdma_convert_iovs(target, pos, type, seg, RPCRDMA_MAX_SEGS);
-	if (nsegs == 0)
-		return 0;
+	if (nsegs < 0)
+		return nsegs;
 
 	do {
-		int n = rpcrdma_register_external(seg, nsegs,
+		n = rpcrdma_register_external(seg, nsegs,
 						cur_wchunk != NULL, r_xprt);
 		if (n <= 0)
 			goto out;
@@ -277,7 +282,7 @@  out:
 	for (pos = 0; nchunks--;)
 		pos += rpcrdma_deregister_external(
 				&req->rl_segments[pos], r_xprt);
-	return 0;
+	return n;
 }
 
 /*
@@ -359,6 +364,8 @@  rpcrdma_inline_pullup(struct rpc_rqst *rqst, int pad)
  *  [1] -- the RPC header/data, marshaled by RPC and the NFS protocol.
  *  [2] -- optional padding.
  *  [3] -- if padded, header only in [1] and data here.
+ *
+ * Returns zero on success, otherwise a negative errno.
  */
 
 int
@@ -368,7 +375,8 @@  rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	char *base;
-	size_t hdrlen, rpclen, padlen;
+	size_t rpclen, padlen;
+	ssize_t hdrlen;
 	enum rpcrdma_chunktype rtype, wtype;
 	struct rpcrdma_msg *headerp;
 
@@ -439,7 +447,11 @@  rpcrdma_marshal_req(struct rpc_rqst *rqst)
 	/* The following simplification is not true forever */
 	if (rtype != rpcrdma_noch && wtype == rpcrdma_replych)
 		wtype = rpcrdma_noch;
-	BUG_ON(rtype != rpcrdma_noch && wtype != rpcrdma_noch);
+	if (rtype != rpcrdma_noch && wtype != rpcrdma_noch) {
+		dprintk("RPC:       %s: cannot marshal multiple "
+			"chunk lists\n", __func__);
+		return -EIO;
+	}
 
 	hdrlen = 28; /*sizeof *headerp;*/
 	padlen = 0;
@@ -464,8 +476,11 @@  rpcrdma_marshal_req(struct rpc_rqst *rqst)
 			headerp->rm_body.rm_padded.rm_pempty[1] = xdr_zero;
 			headerp->rm_body.rm_padded.rm_pempty[2] = xdr_zero;
 			hdrlen += 2 * sizeof(u32); /* extra words in padhdr */
-			BUG_ON(wtype != rpcrdma_noch);
-
+			if (wtype != rpcrdma_noch) {
+				dprintk("RPC:       %s: invalid chunk list\n",
+					__func__);
+				return -EIO;
+			}
 		} else {
 			headerp->rm_body.rm_nochunks.rm_empty[0] = xdr_zero;
 			headerp->rm_body.rm_nochunks.rm_empty[1] = xdr_zero;
@@ -500,9 +515,8 @@  rpcrdma_marshal_req(struct rpc_rqst *rqst)
 		hdrlen = rpcrdma_create_chunks(rqst,
 					&rqst->rq_rcv_buf, headerp, wtype);
 	}
-
-	if (hdrlen == 0)
-		return -1;
+	if (hdrlen < 0)
+		return hdrlen;
 
 	dprintk("RPC:       %s: %s: hdrlen %zd rpclen %zd padlen %zd"
 		" headerp 0x%p base 0x%p lkey 0x%x\n",
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index ad22fdc..1ba05bc 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -595,13 +595,12 @@  xprt_rdma_send_request(struct rpc_task *task)
 	struct rpc_xprt *xprt = rqst->rq_xprt;
 	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	int rc;
 
-	/* marshal the send itself */
-	if (req->rl_niovs == 0 && rpcrdma_marshal_req(rqst) != 0) {
-		r_xprt->rx_stats.failed_marshal_count++;
-		dprintk("RPC:       %s: rpcrdma_marshal_req failed\n",
-			__func__);
-		return -EIO;
+	if (req->rl_niovs == 0) {
+		rc = rpcrdma_marshal_req(rqst);
+		if (rc < 0)
+			goto failed_marshal;
 	}
 
 	if (req->rl_reply == NULL) 		/* e.g. reconnection */
@@ -625,6 +624,12 @@  xprt_rdma_send_request(struct rpc_task *task)
 	rqst->rq_bytes_sent = 0;
 	return 0;
 
+failed_marshal:
+	r_xprt->rx_stats.failed_marshal_count++;
+	dprintk("RPC:       %s: rpcrdma_marshal_req failed, status %i\n",
+		__func__, rc);
+	if (rc == -EIO)
+		return -EIO;
 drop_connection:
 	xprt_disconnect_done(xprt);
 	return -ENOTCONN;	/* implies disconnect */