From patchwork Fri Oct 16 13:25:48 2015 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Chuck Lever X-Patchwork-Id: 7415271 Return-Path: X-Original-To: patchwork-linux-nfs@patchwork.kernel.org Delivered-To: patchwork-parsemail@patchwork1.web.kernel.org Received: from mail.kernel.org (mail.kernel.org [198.145.29.136]) by patchwork1.web.kernel.org (Postfix) with ESMTP id 61FA39F36A for ; Fri, 16 Oct 2015 13:25:54 +0000 (UTC) Received: from mail.kernel.org (localhost [127.0.0.1]) by mail.kernel.org (Postfix) with ESMTP id 2B3FB2053E for ; Fri, 16 Oct 2015 13:25:53 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id F04CE204FC for ; Fri, 16 Oct 2015 13:25:51 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1753200AbbJPNZu (ORCPT ); Fri, 16 Oct 2015 09:25:50 -0400 Received: from mail-qk0-f169.google.com ([209.85.220.169]:36184 "EHLO mail-qk0-f169.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1752228AbbJPNZt (ORCPT ); Fri, 16 Oct 2015 09:25:49 -0400 Received: by qkht68 with SMTP id t68so53863686qkh.3; Fri, 16 Oct 2015 06:25:49 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=sender:subject:from:to:cc:date:message-id:in-reply-to:references :user-agent:mime-version:content-type:content-transfer-encoding; bh=kg8msWWqnJxjAWIEYJuIjlFhCu7u2h3Rt2pDuc1JFQY=; b=a1lPSAIxAj3stXk6/cVjMr05pIcc8n0/B9YQdAUsQzxMBOXkOSBnnSf+vHR3n3YT3a tpW9UgBJnM2g/HeZHO1lep++gytwM6JwPeAPvd/Mw6IhddPpdOlC6abO3/LVLeUR6Uun xLHhyoeHIggIdmJ+gUwm+uJiWPoozRLenuS1zyuXBHioi7c6uSnzZBgec8LwQlUZL4Bd DAuMIthIbuiTHVKcCkFGQiUjyUV2302JWHbCu5tNE8A8AGRdWieWLyCALLcaAP5IEOsX 9th2wKst0UUCheHDcUZU2+jQ7OyoT9QqaXoUxSIdf4szAHM7fymcLfisHXYEYRlAo+Lp jefQ== X-Received: by 10.55.18.164 with SMTP id 36mr18752888qks.67.1445001949156; Fri, 16 Oct 2015 06:25:49 -0700 (PDT) Received: from oracle-122.nfsv4bat.org (nat-pool-rdu-u.redhat.com. [66.187.233.203]) by smtp.gmail.com with ESMTPSA id 138sm7662183qhw.27.2015.10.16.06.25.48 (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Fri, 16 Oct 2015 06:25:48 -0700 (PDT) Subject: [PATCH v3 13/16] xprtrdma: Handle incoming backward direction RPC calls From: Chuck Lever To: anna.schumaker@netapp.com Cc: linux-rdma@vger.kernel.org, linux-nfs@vger.kernel.org Date: Fri, 16 Oct 2015 09:25:48 -0400 Message-ID: <20151016132548.6819.37320.stgit@oracle-122.nfsv4bat.org> In-Reply-To: <20151016131958.6819.98407.stgit@oracle-122.nfsv4bat.org> References: <20151016131958.6819.98407.stgit@oracle-122.nfsv4bat.org> User-Agent: StGit/0.17.1-3-g7d0f MIME-Version: 1.0 Sender: linux-nfs-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: linux-nfs@vger.kernel.org X-Spam-Status: No, score=-6.8 required=5.0 tests=BAYES_00,DKIM_SIGNED, RCVD_IN_DNSWL_HI,T_DKIM_INVALID,T_RP_MATCHES_RCVD,UNPARSEABLE_RELAY autolearn=ham version=3.3.1 X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on mail.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP Introduce a code path in the rpcrdma_reply_handler() to catch incoming backward direction RPC calls and route them to the ULP's backchannel server. Signed-off-by: Chuck Lever Reviewed-by: Sagi Grimberg Tested-By: Devesh Sharma --- net/sunrpc/xprtrdma/backchannel.c | 118 +++++++++++++++++++++++++++++++++++++ net/sunrpc/xprtrdma/rpc_rdma.c | 41 +++++++++++++ net/sunrpc/xprtrdma/xprt_rdma.h | 2 + 3 files changed, 161 insertions(+) -- To unsubscribe from this list: send the line "unsubscribe linux-nfs" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index ffc4853..0b3387f 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -5,6 +5,8 @@ */ #include +#include +#include #include "xprt_rdma.h" @@ -12,6 +14,8 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif +#define RPCRDMA_BACKCHANNEL_DEBUG + static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) { @@ -253,3 +257,117 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); spin_unlock_bh(&xprt->bc_pa_lock); } + +/** + * rpcrdma_bc_receive_call - Handle a backward direction call + * @xprt: transport receiving the call + * @rep: receive buffer containing the call + * + * Called in the RPC reply handler, which runs in a tasklet. + * Be quick about it. + * + * Operational assumptions: + * o Backchannel credits are ignored, just as the NFS server + * forechannel currently does + * o The ULP manages a replay cache (eg, NFSv4.1 sessions). + * No replay detection is done at the transport level + */ +void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_rep *rep) +{ + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + struct rpcrdma_msg *headerp; + struct svc_serv *bc_serv; + struct rpcrdma_req *req; + struct rpc_rqst *rqst; + struct xdr_buf *buf; + size_t size; + __be32 *p; + + headerp = rdmab_to_msg(rep->rr_rdmabuf); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: callback XID %08x, length=%u\n", + __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len); + pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp); +#endif + + /* Sanity check: + * Need at least enough bytes for RPC/RDMA header, as code + * here references the header fields by array offset. Also, + * backward calls are always inline, so ensure there + * are some bytes beyond the RPC/RDMA header. + */ + if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24) + goto out_short; + p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN); + size = rep->rr_len - RPCRDMA_HDRLEN_MIN; + + /* Grab a free bc rqst */ + spin_lock(&xprt->bc_pa_lock); + if (list_empty(&xprt->bc_pa_list)) { + spin_unlock(&xprt->bc_pa_lock); + goto out_overflow; + } + rqst = list_first_entry(&xprt->bc_pa_list, + struct rpc_rqst, rq_bc_pa_list); + list_del(&rqst->rq_bc_pa_list); + spin_unlock(&xprt->bc_pa_lock); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: using rqst %p\n", __func__, rqst); +#endif + + /* Prepare rqst */ + rqst->rq_reply_bytes_recvd = 0; + rqst->rq_bytes_sent = 0; + rqst->rq_xid = headerp->rm_xid; + set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); + + buf = &rqst->rq_rcv_buf; + memset(buf, 0, sizeof(*buf)); + buf->head[0].iov_base = p; + buf->head[0].iov_len = size; + buf->len = size; + + /* The receive buffer has to be hooked to the rpcrdma_req + * so that it can be reposted after the server is done + * parsing it but just before sending the backward + * direction reply. + */ + req = rpcr_to_rdmar(rqst); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: attaching rep %p to req %p\n", + __func__, rep, req); +#endif + req->rl_reply = rep; + + /* Defeat the retransmit detection logic in send_request */ + req->rl_connect_cookie = 0; + + /* Queue rqst for ULP's callback service */ + bc_serv = xprt->bc_serv; + spin_lock(&bc_serv->sv_cb_lock); + list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list); + spin_unlock(&bc_serv->sv_cb_lock); + + wake_up(&bc_serv->sv_cb_waitq); + + r_xprt->rx_stats.bcall_count++; + return; + +out_overflow: + pr_warn("RPC/RDMA backchannel overflow\n"); + xprt_disconnect_done(xprt); + /* This receive buffer gets reposted automatically + * when the connection is re-established. + */ + return; + +out_short: + pr_warn("RPC/RDMA short backward direction call\n"); + + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) + xprt_disconnect_done(xprt); + else + pr_warn("RPC: %s: reposting rep %p\n", + __func__, rep); +} diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index b7a21e5..c10d969 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -716,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work) spin_unlock_bh(&xprt->transport_lock); } +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +/* By convention, backchannel calls arrive via rdma_msg type + * messages, and never populate the chunk lists. This makes + * the RPC/RDMA header small and fixed in size, so it is + * straightforward to check the RPC header's direction field. + */ +static bool +rpcrdma_is_bcall(struct rpcrdma_msg *headerp) +{ + __be32 *p = (__be32 *)headerp; + + if (headerp->rm_type != rdma_msg) + return false; + if (headerp->rm_body.rm_chunks[0] != xdr_zero) + return false; + if (headerp->rm_body.rm_chunks[1] != xdr_zero) + return false; + if (headerp->rm_body.rm_chunks[2] != xdr_zero) + return false; + + /* sanity */ + if (p[7] != headerp->rm_xid) + return false; + /* call direction */ + if (p[8] != cpu_to_be32(RPC_CALL)) + return false; + + return true; +} +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + /* * This function is called when an async event is posted to * the connection which changes the connection state. All it @@ -756,6 +787,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) headerp = rdmab_to_msg(rep->rr_rdmabuf); if (headerp->rm_vers != rpcrdma_version) goto out_badversion; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (rpcrdma_is_bcall(headerp)) + goto out_bcall; +#endif /* Match incoming rpcrdma_rep to an rpcrdma_req to * get context for handling any incoming chunks. @@ -878,6 +913,12 @@ out_badstatus: } return; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +out_bcall: + rpcrdma_bc_receive_call(r_xprt, rep); + return; +#endif + out_shortreply: dprintk("RPC: %s: short/invalid reply\n", __func__); goto repost; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index e2d23ea..eb87d96e8 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -353,6 +353,7 @@ struct rpcrdma_stats { unsigned long failed_marshal_count; unsigned long bad_reply_count; unsigned long nomsg_call_count; + unsigned long bcall_count; }; /* @@ -520,6 +521,7 @@ void xprt_rdma_cleanup(void); #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); +void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); int rpcrdma_bc_marshal_reply(struct rpc_rqst *); void xprt_rdma_bc_free_rqst(struct rpc_rqst *); void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);