From patchwork Thu Sep 17 20:45:33 2015 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Chuck Lever X-Patchwork-Id: 7210321 Return-Path: X-Original-To: patchwork-linux-rdma@patchwork.kernel.org Delivered-To: patchwork-parsemail@patchwork1.web.kernel.org Received: from mail.kernel.org (mail.kernel.org [198.145.29.136]) by patchwork1.web.kernel.org (Postfix) with ESMTP id 8C01A9F32B for ; Thu, 17 Sep 2015 20:45:41 +0000 (UTC) Received: from mail.kernel.org (localhost [127.0.0.1]) by mail.kernel.org (Postfix) with ESMTP id 73E6C2077D for ; Thu, 17 Sep 2015 20:45:40 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id 8BB3E20785 for ; Thu, 17 Sep 2015 20:45:38 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1752009AbbIQUpg (ORCPT ); Thu, 17 Sep 2015 16:45:36 -0400 Received: from mail-qg0-f47.google.com ([209.85.192.47]:33379 "EHLO mail-qg0-f47.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1751976AbbIQUpf (ORCPT ); Thu, 17 Sep 2015 16:45:35 -0400 Received: by qgev79 with SMTP id v79so23454712qge.0; Thu, 17 Sep 2015 13:45:34 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=sender:subject:from:to:date:message-id:in-reply-to:references :user-agent:mime-version:content-type:content-transfer-encoding; bh=NX3xi/hsuKV4Ygfx3ADyxEUAITwtebI6iMyzVpCQTJc=; b=Sim5S8qIP5cxp5MW7qn4V2fLKMp1xvte0jFkB40juQLCxy4X32j8p2WA6/9rSOi9p/ q6eHY7/TEybMzfsrR7EOcYFYjOsvbo6h2IoMqmMSnkcql4erslYdKrUMLZdFIA2BYwDE H0lsjXO/TVFrVDuL6mNDD8Z/BXpiyhR0Ql4uaShf9+7SF/rfqRDkY7Xn3PW8/RtV9XxU upYr3/36BLAmiJG/EEjGe3SJWMyOsQSvTaq8U5eH63yesQCEyRsQJHwDrqj7WcqCemTE BzLvJwUQ6jQ9wY/PrkK/9p8dk+m/hrmsM0FbxGFNseOLVi2ev3F5CmelfnCjdmCtouM1 lUgg== X-Received: by 10.140.152.73 with SMTP id 70mr2223089qhy.15.1442522734553; Thu, 17 Sep 2015 13:45:34 -0700 (PDT) Received: from manet.1015granger.net ([2604:8800:100:81fc:82ee:73ff:fe43:d64f]) by smtp.gmail.com with ESMTPSA id g71sm2048287qkh.30.2015.09.17.13.45.33 (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 17 Sep 2015 13:45:34 -0700 (PDT) Subject: [PATCH v1 10/18] xprtrdma: Handle incoming backward direction RPC calls From: Chuck Lever To: linux-rdma@vger.kernel.org, linux-nfs@vger.kernel.org Date: Thu, 17 Sep 2015 16:45:33 -0400 Message-ID: <20150917204533.19671.80225.stgit@manet.1015granger.net> In-Reply-To: <20150917202829.19671.90044.stgit@manet.1015granger.net> References: <20150917202829.19671.90044.stgit@manet.1015granger.net> User-Agent: StGit/0.17.1-3-g7d0f MIME-Version: 1.0 Sender: linux-rdma-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: linux-rdma@vger.kernel.org X-Spam-Status: No, score=-6.8 required=5.0 tests=BAYES_00,DKIM_SIGNED, RCVD_IN_DNSWL_HI,T_DKIM_INVALID,T_RP_MATCHES_RCVD,UNPARSEABLE_RELAY autolearn=unavailable version=3.3.1 X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on mail.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP Introduce a code path in the rpcrdma_reply_handler() to catch incoming backward direction RPC calls and route them to the ULP's backchannel server. Signed-off-by: Chuck Lever --- net/sunrpc/xprtrdma/backchannel.c | 115 +++++++++++++++++++++++++++++++++++++ net/sunrpc/xprtrdma/rpc_rdma.c | 41 +++++++++++++ net/sunrpc/xprtrdma/xprt_rdma.h | 2 + 3 files changed, 158 insertions(+) -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c index cc9c762..2eee18a 100644 --- a/net/sunrpc/xprtrdma/backchannel.c +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -5,6 +5,8 @@ */ #include +#include +#include #include "xprt_rdma.h" @@ -12,6 +14,8 @@ # define RPCDBG_FACILITY RPCDBG_TRANS #endif +#define RPCRDMA_BACKCHANNEL_DEBUG + static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, struct rpc_rqst *rqst) { @@ -251,3 +255,114 @@ void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); spin_unlock_bh(&xprt->bc_pa_lock); } + +/** + * rpcrdma_bc_receive_call - Handle a backward direction call + * @xprt: transport receiving the call + * @rep: receive buffer containing the call + * + * Called in the RPC reply handler, which runs in a tasklet. + * Be quick about it. + * + * Operational assumptions: + * o Backchannel credits are ignored, just as the NFS server + * forechannel currently does + * o The ULP manages a replay cache (eg, NFSv4.1 sessions). + * No replay detection is done at the transport level + */ +void rpcrdma_bc_receive_call(struct rpcrdma_xprt *r_xprt, + struct rpcrdma_rep *rep) +{ + struct rpc_xprt *xprt = &r_xprt->rx_xprt; + struct rpcrdma_msg *headerp; + struct svc_serv *bc_serv; + struct rpcrdma_req *req; + struct rpc_rqst *rqst; + struct xdr_buf *buf; + size_t size; + __be32 *p; + + headerp = rdmab_to_msg(rep->rr_rdmabuf); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: callback XID %08x, length=%u\n", + __func__, be32_to_cpu(headerp->rm_xid), rep->rr_len); + pr_info("RPC: %s: %*ph\n", __func__, rep->rr_len, headerp); +#endif + + /* Sanity check: + * Need at least enough bytes for RPC/RDMA header, as code + * here references the header fields by array offset. Also, + * backward calls are always inline, so ensure there + * are some bytes beyond the RPC/RDMA header. + */ + if (rep->rr_len < RPCRDMA_HDRLEN_MIN + 24) + goto out_short; + p = (__be32 *)((unsigned char *)headerp + RPCRDMA_HDRLEN_MIN); + size = rep->rr_len - RPCRDMA_HDRLEN_MIN; + + /* Grab a free bc rqst */ + spin_lock(&xprt->bc_pa_lock); + if (list_empty(&xprt->bc_pa_list)) { + spin_unlock(&xprt->bc_pa_lock); + goto out_overflow; + } + rqst = list_first_entry(&xprt->bc_pa_list, + struct rpc_rqst, rq_bc_pa_list); + list_del(&rqst->rq_bc_pa_list); + spin_unlock(&xprt->bc_pa_lock); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: using rqst %p\n", __func__, rqst); +#endif + + /* Prepare rqst */ + rqst->rq_reply_bytes_recvd = 0; + rqst->rq_bytes_sent = 0; + rqst->rq_xid = headerp->rm_xid; + set_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); + + buf = &rqst->rq_rcv_buf; + memset(buf, 0, sizeof(*buf)); + buf->head[0].iov_base = p; + buf->head[0].iov_len = size; + buf->len = size; + + /* The receive buffer has to be hooked to the rpcrdma_req so that + * it can be reposted after the server is done parsing it but just + * before sending the backward direction reply. */ + req = rpcr_to_rdmar(rqst); +#ifdef RPCRDMA_BACKCHANNEL_DEBUG + pr_info("RPC: %s: attaching rep %p to req %p\n", + __func__, rep, req); +#endif + req->rl_reply = rep; + + /* Defeat the retransmit detection logic in send_request */ + req->rl_connect_cookie = 0; + + /* Queue rqst for ULP's callback service */ + bc_serv = xprt->bc_serv; + spin_lock(&bc_serv->sv_cb_lock); + list_add(&rqst->rq_bc_list, &bc_serv->sv_cb_list); + spin_unlock(&bc_serv->sv_cb_lock); + + wake_up(&bc_serv->sv_cb_waitq); + + r_xprt->rx_stats.bcall_count++; + return; + +out_overflow: + pr_warn("RPC/RDMA backchannel overflow\n"); + xprt_disconnect_done(xprt); + /* This receive buffer gets reposted automatically + * when the connection is re-established. */ + return; + +out_short: + pr_warn("RPC/RDMA short backward direction call\n"); + + if (rpcrdma_ep_post_recv(&r_xprt->rx_ia, &r_xprt->rx_ep, rep)) + xprt_disconnect_done(xprt); + else + pr_warn("RPC: %s: reposting rep %p\n", + __func__, rep); +} diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c index d0dbbf7..3830250 100644 --- a/net/sunrpc/xprtrdma/rpc_rdma.c +++ b/net/sunrpc/xprtrdma/rpc_rdma.c @@ -716,6 +716,37 @@ rpcrdma_connect_worker(struct work_struct *work) spin_unlock_bh(&xprt->transport_lock); } +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +/* By convention, backchannel calls arrive via rdma_msg type + * messages, and never populate the chunk lists. This makes + * the RPC/RDMA header small and fixed in size, so it is + * straightforward to check the RPC header's direction field. + */ +static bool +rpcrdma_is_bcall(struct rpcrdma_msg *headerp) +{ + __be32 *p = (__be32 *)headerp; + + if (headerp->rm_type != rdma_msg) + return false; + if (headerp->rm_body.rm_chunks[0] != xdr_zero) + return false; + if (headerp->rm_body.rm_chunks[1] != xdr_zero) + return false; + if (headerp->rm_body.rm_chunks[2] != xdr_zero) + return false; + + /* sanity */ + if (p[7] != headerp->rm_xid) + return false; + /* call direction */ + if (p[8] != cpu_to_be32(RPC_CALL)) + return false; + + return true; +} +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + /* * This function is called when an async event is posted to * the connection which changes the connection state. All it @@ -756,6 +787,10 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep) headerp = rdmab_to_msg(rep->rr_rdmabuf); if (headerp->rm_vers != rpcrdma_version) goto out_badversion; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + if (rpcrdma_is_bcall(headerp)) + goto out_bcall; +#endif /* Get XID and try for a match. */ spin_lock(&xprt->transport_lock); @@ -877,6 +912,12 @@ out_badstatus: } return; +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +out_bcall: + rpcrdma_bc_receive_call(r_xprt, rep); + return; +#endif + out_shortreply: dprintk("RPC: %s: short/invalid reply\n", __func__); goto repost; diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index a59ce18..3e513e7c 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -352,6 +352,7 @@ struct rpcrdma_stats { unsigned long failed_marshal_count; unsigned long bad_reply_count; unsigned long nomsg_call_count; + unsigned long bcall_count; }; /* @@ -516,6 +517,7 @@ void xprt_rdma_cleanup(void); #if defined(CONFIG_SUNRPC_BACKCHANNEL) int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); +void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *); int rpcrdma_bc_marshal_reply(struct rpc_rqst *); void xprt_rdma_bc_free_rqst(struct rpc_rqst *); void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);