From patchwork Thu Sep 17 20:44:52 2015 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Chuck Lever III X-Patchwork-Id: 7210211 Return-Path: X-Original-To: patchwork-linux-rdma@patchwork.kernel.org Delivered-To: patchwork-parsemail@patchwork1.web.kernel.org Received: from mail.kernel.org (mail.kernel.org [198.145.29.136]) by patchwork1.web.kernel.org (Postfix) with ESMTP id 8F2129F32B for ; Thu, 17 Sep 2015 20:44:58 +0000 (UTC) Received: from mail.kernel.org (localhost [127.0.0.1]) by mail.kernel.org (Postfix) with ESMTP id 487F920783 for ; Thu, 17 Sep 2015 20:44:57 +0000 (UTC) Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by mail.kernel.org (Postfix) with ESMTP id E405D2077A for ; Thu, 17 Sep 2015 20:44:55 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1752053AbbIQUoy (ORCPT ); Thu, 17 Sep 2015 16:44:54 -0400 Received: from mail-qg0-f52.google.com ([209.85.192.52]:34849 "EHLO mail-qg0-f52.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1751752AbbIQUoy (ORCPT ); Thu, 17 Sep 2015 16:44:54 -0400 Received: by qgt47 with SMTP id 47so23412405qgt.2; Thu, 17 Sep 2015 13:44:53 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=sender:subject:from:to:date:message-id:in-reply-to:references :user-agent:mime-version:content-type:content-transfer-encoding; bh=RDWDKRYLL5WaLNYH0XTZN3HPzGVMNs9PlYmrYlBGK0Y=; b=SDCM7o+9cVwUHSdzxCM4FDiDt4dbyP2WtxpkGJiDJPUrqdepC1owVes8O30xMZ9VBH G8FD+965SL5WVPSsqeRTGTdeyq7Pvhr+CKLlO+2qZoPMClU6oS/2S/rMLG1vo/4B7nON 1luvzKQKoy89YIgH9OEjF/i6O6XuDjytFmZ2WSXBvpW66T7lL+8liF0Mhq+0hZRZkuPM a5r0SSUw7gT2IJBhFSDpkX+UVl1dCL1P90LXLH7XYkVFXFWIfK+xNHejssK/eyaA5CZk k22pO96Cq7lKmgQUVQs2AsEHUPWH7Y3zUDuFKI6IySu1FB900hmZkvpimbctewwcIVgw aUvQ== X-Received: by 10.141.23.210 with SMTP id z201mr2175940qhd.104.1442522693349; Thu, 17 Sep 2015 13:44:53 -0700 (PDT) Received: from manet.1015granger.net ([2604:8800:100:81fc:82ee:73ff:fe43:d64f]) by smtp.gmail.com with ESMTPSA id 187sm2042783qhf.16.2015.09.17.13.44.52 (version=TLSv1.2 cipher=ECDHE-RSA-AES128-GCM-SHA256 bits=128/128); Thu, 17 Sep 2015 13:44:53 -0700 (PDT) Subject: [PATCH v1 05/18] xprtrdma: Replace send and receive arrays From: Chuck Lever To: linux-rdma@vger.kernel.org, linux-nfs@vger.kernel.org Date: Thu, 17 Sep 2015 16:44:52 -0400 Message-ID: <20150917204452.19671.66113.stgit@manet.1015granger.net> In-Reply-To: <20150917202829.19671.90044.stgit@manet.1015granger.net> References: <20150917202829.19671.90044.stgit@manet.1015granger.net> User-Agent: StGit/0.17.1-3-g7d0f MIME-Version: 1.0 Sender: linux-rdma-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: linux-rdma@vger.kernel.org X-Spam-Status: No, score=-6.8 required=5.0 tests=BAYES_00,DKIM_SIGNED, RCVD_IN_DNSWL_HI,T_DKIM_INVALID,T_RP_MATCHES_RCVD,UNPARSEABLE_RELAY autolearn=ham version=3.3.1 X-Spam-Checker-Version: SpamAssassin 3.3.1 (2010-03-16) on mail.kernel.org X-Virus-Scanned: ClamAV using ClamSMTP The rb_send_bufs and rb_recv_bufs arrays are used to implement a pair of stacks for keeping track of free rpcrdma_req and rpcrdma_rep structs. Replace those arrays with free lists. To allow more than 512 RPCs in-flight at once, each of these arrays would be larger than a page (assuming 8-byte addresses and 4KB pages). Allowing up to 64K in-flight RPCs (as TCP now does), each buffer array would have to be 128 pages. That's an order-6 allocation. (Not that we're going there.) A list is easier to expand dynamically. Instead of allocating a larger array of pointers and copying the existing pointers to the new array, simply append more buffers to each list. This also makes it simpler to manage receive buffers that might catch backwards-direction calls, or to post receive buffers in bulk to amortize the overhead of ib_post_recv. Signed-off-by: Chuck Lever --- net/sunrpc/xprtrdma/verbs.c | 141 +++++++++++++++++---------------------- net/sunrpc/xprtrdma/xprt_rdma.h | 9 +- 2 files changed, 66 insertions(+), 84 deletions(-) -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index ac1345b..8d99214 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -962,44 +962,18 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_buffer *buf = &r_xprt->rx_buf; struct rpcrdma_ia *ia = &r_xprt->rx_ia; - struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; - char *p; - size_t len; int i, rc; - buf->rb_max_requests = cdata->max_requests; + buf->rb_max_requests = r_xprt->rx_data.max_requests; spin_lock_init(&buf->rb_lock); - /* Need to allocate: - * 1. arrays for send and recv pointers - * 2. arrays of struct rpcrdma_req to fill in pointers - * 3. array of struct rpcrdma_rep for replies - * Send/recv buffers in req/rep need to be registered - */ - len = buf->rb_max_requests * - (sizeof(struct rpcrdma_req *) + sizeof(struct rpcrdma_rep *)); - - p = kzalloc(len, GFP_KERNEL); - if (p == NULL) { - dprintk("RPC: %s: req_t/rep_t/pad kzalloc(%zd) failed\n", - __func__, len); - rc = -ENOMEM; - goto out; - } - buf->rb_pool = p; /* for freeing it later */ - - buf->rb_send_bufs = (struct rpcrdma_req **) p; - p = (char *) &buf->rb_send_bufs[buf->rb_max_requests]; - buf->rb_recv_bufs = (struct rpcrdma_rep **) p; - p = (char *) &buf->rb_recv_bufs[buf->rb_max_requests]; - rc = ia->ri_ops->ro_init(r_xprt); if (rc) goto out; + INIT_LIST_HEAD(&buf->rb_send_bufs); for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_req *req; - struct rpcrdma_rep *rep; req = rpcrdma_create_req(r_xprt); if (IS_ERR(req)) { @@ -1008,7 +982,12 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) rc = PTR_ERR(req); goto out; } - buf->rb_send_bufs[i] = req; + list_add(&req->rl_free, &buf->rb_send_bufs); + } + + INIT_LIST_HEAD(&buf->rb_recv_bufs); + for (i = 0; i < buf->rb_max_requests + 2; i++) { + struct rpcrdma_rep *rep; rep = rpcrdma_create_rep(r_xprt); if (IS_ERR(rep)) { @@ -1017,7 +996,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) rc = PTR_ERR(rep); goto out; } - buf->rb_recv_bufs[i] = rep; + list_add(&rep->rr_list, &buf->rb_recv_bufs); } return 0; @@ -1051,25 +1030,26 @@ void rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) { struct rpcrdma_ia *ia = rdmab_to_ia(buf); - int i; - /* clean up in reverse order from create - * 1. recv mr memory (mr free, then kfree) - * 2. send mr memory (mr free, then kfree) - * 3. MWs - */ - dprintk("RPC: %s: entering\n", __func__); + while (!list_empty(&buf->rb_recv_bufs)) { + struct rpcrdma_rep *rep = list_entry(buf->rb_recv_bufs.next, + struct rpcrdma_rep, + rr_list); - for (i = 0; i < buf->rb_max_requests; i++) { - if (buf->rb_recv_bufs) - rpcrdma_destroy_rep(ia, buf->rb_recv_bufs[i]); - if (buf->rb_send_bufs) - rpcrdma_destroy_req(ia, buf->rb_send_bufs[i]); + list_del(&rep->rr_list); + rpcrdma_destroy_rep(ia, rep); } - ia->ri_ops->ro_destroy(buf); + while (!list_empty(&buf->rb_send_bufs)) { + struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next, + struct rpcrdma_req, + rl_free); - kfree(buf->rb_pool); + list_del(&req->rl_free); + rpcrdma_destroy_req(ia, req); + } + + ia->ri_ops->ro_destroy(buf); } struct rpcrdma_mw * @@ -1102,24 +1082,27 @@ rpcrdma_put_mw(struct rpcrdma_xprt *r_xprt, struct rpcrdma_mw *mw) } static void -rpcrdma_buffer_put_sendbuf(struct rpcrdma_req *req, struct rpcrdma_buffer *buf) +rpcrdma_buffer_put_locked(struct rpcrdma_rep *rep, struct rpcrdma_buffer *buf) { - buf->rb_send_bufs[--buf->rb_send_index] = req; - req->rl_niovs = 0; - if (req->rl_reply) { - buf->rb_recv_bufs[--buf->rb_recv_index] = req->rl_reply; - req->rl_reply = NULL; - } + list_add_tail(&rep->rr_list, &buf->rb_recv_bufs); +} + +static struct rpcrdma_rep * +rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf) +{ + struct rpcrdma_rep *rep; + + rep = list_first_entry(&buf->rb_recv_bufs, + struct rpcrdma_rep, rr_list); + list_del(&rep->rr_list); + + return rep; } /* * Get a set of request/reply buffers. * - * Reply buffer (if needed) is attached to send buffer upon return. - * Rule: - * rb_send_index and rb_recv_index MUST always be pointing to the - * *next* available buffer (non-NULL). They are incremented after - * removing buffers, and decremented *before* returning them. + * Reply buffer (if available) is attached to send buffer upon return. */ struct rpcrdma_req * rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) @@ -1129,25 +1112,22 @@ rpcrdma_buffer_get(struct rpcrdma_buffer *buffers) spin_lock_irqsave(&buffers->rb_lock, flags); - if (buffers->rb_send_index == buffers->rb_max_requests) { + if (list_empty(&buffers->rb_send_bufs)) { spin_unlock_irqrestore(&buffers->rb_lock, flags); - dprintk("RPC: %s: out of request buffers\n", __func__); - return ((struct rpcrdma_req *)NULL); - } - - req = buffers->rb_send_bufs[buffers->rb_send_index]; - if (buffers->rb_send_index < buffers->rb_recv_index) { - dprintk("RPC: %s: %d extra receives outstanding (ok)\n", - __func__, - buffers->rb_recv_index - buffers->rb_send_index); - req->rl_reply = NULL; - } else { - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; + pr_warn("RPC: %s: out of request buffers\n", __func__); + return NULL; } - buffers->rb_send_bufs[buffers->rb_send_index++] = NULL; + req = list_first_entry(&buffers->rb_send_bufs, + struct rpcrdma_req, rl_free); + list_del(&req->rl_free); + req->rl_reply = NULL; + if (!list_empty(&buffers->rb_recv_bufs)) + req->rl_reply = rpcrdma_buffer_get_locked(buffers); spin_unlock_irqrestore(&buffers->rb_lock, flags); + + if (!req->rl_reply) + pr_warn("RPC: %s: out of reply buffers\n", __func__); return req; } @@ -1159,17 +1139,22 @@ void rpcrdma_buffer_put(struct rpcrdma_req *req) { struct rpcrdma_buffer *buffers = req->rl_buffer; + struct rpcrdma_rep *rep = req->rl_reply; unsigned long flags; + req->rl_niovs = 0; + req->rl_reply = NULL; + spin_lock_irqsave(&buffers->rb_lock, flags); - rpcrdma_buffer_put_sendbuf(req, buffers); + list_add_tail(&req->rl_free, &buffers->rb_send_bufs); + if (rep) + rpcrdma_buffer_put_locked(rep, buffers); spin_unlock_irqrestore(&buffers->rb_lock, flags); } /* * Recover reply buffers from pool. - * This happens when recovering from error conditions. - * Post-increment counter/array index. + * This happens when recovering from disconnect. */ void rpcrdma_recv_buffer_get(struct rpcrdma_req *req) @@ -1178,10 +1163,8 @@ rpcrdma_recv_buffer_get(struct rpcrdma_req *req) unsigned long flags; spin_lock_irqsave(&buffers->rb_lock, flags); - if (buffers->rb_recv_index < buffers->rb_max_requests) { - req->rl_reply = buffers->rb_recv_bufs[buffers->rb_recv_index]; - buffers->rb_recv_bufs[buffers->rb_recv_index++] = NULL; - } + if (!list_empty(&buffers->rb_recv_bufs)) + req->rl_reply = rpcrdma_buffer_get_locked(buffers); spin_unlock_irqrestore(&buffers->rb_lock, flags); } @@ -1196,7 +1179,7 @@ rpcrdma_recv_buffer_put(struct rpcrdma_rep *rep) unsigned long flags; spin_lock_irqsave(&buffers->rb_lock, flags); - buffers->rb_recv_bufs[--buffers->rb_recv_index] = rep; + rpcrdma_buffer_put_locked(rep, buffers); spin_unlock_irqrestore(&buffers->rb_lock, flags); } diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index a13508b..e6a358f 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -252,6 +252,7 @@ struct rpcrdma_mr_seg { /* chunk descriptors */ #define RPCRDMA_MAX_IOVS (2) struct rpcrdma_req { + struct list_head rl_free; unsigned int rl_niovs; unsigned int rl_nchunks; unsigned int rl_connect_cookie; @@ -285,12 +286,10 @@ struct rpcrdma_buffer { struct list_head rb_all; char *rb_pool; - spinlock_t rb_lock; /* protect buf arrays */ + spinlock_t rb_lock; /* protect buf lists */ + struct list_head rb_send_bufs; + struct list_head rb_recv_bufs; u32 rb_max_requests; - int rb_send_index; - int rb_recv_index; - struct rpcrdma_req **rb_send_bufs; - struct rpcrdma_rep **rb_recv_bufs; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)