diff mbox series

[1/2] SUNRPC: Fix up backchannel slot table accounting

Message ID 20190716200157.38583-1-trond.myklebust@hammerspace.com (mailing list archive)
State New, archived
Headers show
Series [1/2] SUNRPC: Fix up backchannel slot table accounting | expand

Commit Message

Trond Myklebust July 16, 2019, 8:01 p.m. UTC
Add a per-transport maximum limit in the socket case, and add
helpers to allow the NFSv4 code to discover that limit.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
---
 fs/nfs/nfs4proc.c                 |  3 +++
 include/linux/sunrpc/bc_xprt.h    |  1 +
 include/linux/sunrpc/clnt.h       |  1 +
 include/linux/sunrpc/xprt.h       |  6 +++--
 net/sunrpc/backchannel_rqst.c     | 40 +++++++++++++++++--------------
 net/sunrpc/clnt.c                 | 13 ++++++++++
 net/sunrpc/svc.c                  |  2 +-
 net/sunrpc/xprtrdma/backchannel.c |  7 ++++++
 net/sunrpc/xprtrdma/transport.c   |  1 +
 net/sunrpc/xprtrdma/xprt_rdma.h   |  1 +
 net/sunrpc/xprtsock.c             |  1 +
 11 files changed, 55 insertions(+), 21 deletions(-)

Comments

Chuck Lever July 17, 2019, 1:55 p.m. UTC | #1
Hi Trond-

> On Jul 16, 2019, at 4:01 PM, Trond Myklebust <trondmy@gmail.com> wrote:
> 
> Add a per-transport maximum limit in the socket case, and add
> helpers to allow the NFSv4 code to discover that limit.

For RDMA, the number of credits is permitted to change during the life
of the connection, so this is not a fixed limit for such transports.

And, AFAICT, it's not necessary to know the transport's limit. The
lesser of the NFS backchannel and RPC/RDMA reverse credit limit will
be used.

The patch description doesn't explain why this change is now necessary,
so I don't get what's going on here.


> Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
> ---
> fs/nfs/nfs4proc.c                 |  3 +++
> include/linux/sunrpc/bc_xprt.h    |  1 +
> include/linux/sunrpc/clnt.h       |  1 +
> include/linux/sunrpc/xprt.h       |  6 +++--
> net/sunrpc/backchannel_rqst.c     | 40 +++++++++++++++++--------------
> net/sunrpc/clnt.c                 | 13 ++++++++++
> net/sunrpc/svc.c                  |  2 +-
> net/sunrpc/xprtrdma/backchannel.c |  7 ++++++
> net/sunrpc/xprtrdma/transport.c   |  1 +
> net/sunrpc/xprtrdma/xprt_rdma.h   |  1 +
> net/sunrpc/xprtsock.c             |  1 +
> 11 files changed, 55 insertions(+), 21 deletions(-)
> 
> diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
> index 52de7245a2ee..39896afc6edf 100644
> --- a/fs/nfs/nfs4proc.c
> +++ b/fs/nfs/nfs4proc.c
> @@ -8380,6 +8380,7 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args,
> {
> 	unsigned int max_rqst_sz, max_resp_sz;
> 	unsigned int max_bc_payload = rpc_max_bc_payload(clnt);
> +	unsigned int max_bc_slots = rpc_num_bc_slots(clnt);
> 
> 	max_rqst_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxwrite_overhead;
> 	max_resp_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxread_overhead;
> @@ -8402,6 +8403,8 @@ static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args,
> 	args->bc_attrs.max_resp_sz_cached = 0;
> 	args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS;
> 	args->bc_attrs.max_reqs = max_t(unsigned short, max_session_cb_slots, 1);
> +	if (args->bc_attrs.max_reqs > max_bc_slots)
> +		args->bc_attrs.max_reqs = max_bc_slots;
> 
> 	dprintk("%s: Back Channel : max_rqst_sz=%u max_resp_sz=%u "
> 		"max_resp_sz_cached=%u max_ops=%u max_reqs=%u\n",
> diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h
> index d4229a78524a..87d27e13d885 100644
> --- a/include/linux/sunrpc/bc_xprt.h
> +++ b/include/linux/sunrpc/bc_xprt.h
> @@ -43,6 +43,7 @@ void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs);
> int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs);
> void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs);
> void xprt_free_bc_rqst(struct rpc_rqst *req);
> +unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt);
> 
> /*
>  * Determine if a shared backchannel is in use
> diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
> index 4e070e00c143..abc63bd1be2b 100644
> --- a/include/linux/sunrpc/clnt.h
> +++ b/include/linux/sunrpc/clnt.h
> @@ -194,6 +194,7 @@ void		rpc_setbufsize(struct rpc_clnt *, unsigned int, unsigned int);
> struct net *	rpc_net_ns(struct rpc_clnt *);
> size_t		rpc_max_payload(struct rpc_clnt *);
> size_t		rpc_max_bc_payload(struct rpc_clnt *);
> +unsigned int	rpc_num_bc_slots(struct rpc_clnt *);
> void		rpc_force_rebind(struct rpc_clnt *);
> size_t		rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t);
> const char	*rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t);
> diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
> index ed76e5fb36c1..13e108bcc9eb 100644
> --- a/include/linux/sunrpc/xprt.h
> +++ b/include/linux/sunrpc/xprt.h
> @@ -158,6 +158,7 @@ struct rpc_xprt_ops {
> 	int		(*bc_setup)(struct rpc_xprt *xprt,
> 				    unsigned int min_reqs);
> 	size_t		(*bc_maxpayload)(struct rpc_xprt *xprt);
> +	unsigned int	(*bc_num_slots)(struct rpc_xprt *xprt);
> 	void		(*bc_free_rqst)(struct rpc_rqst *rqst);
> 	void		(*bc_destroy)(struct rpc_xprt *xprt,
> 				      unsigned int max_reqs);
> @@ -251,8 +252,9 @@ struct rpc_xprt {
> #if defined(CONFIG_SUNRPC_BACKCHANNEL)
> 	struct svc_serv		*bc_serv;       /* The RPC service which will */
> 						/* process the callback */
> -	int			bc_alloc_count;	/* Total number of preallocs */
> -	atomic_t		bc_free_slots;
> +	unsigned int		bc_alloc_max;
> +	unsigned int		bc_alloc_count;	/* Total number of preallocs */
> +	atomic_t		bc_slot_count;	/* Number of allocated slots */
> 	spinlock_t		bc_pa_lock;	/* Protects the preallocated
> 						 * items */
> 	struct list_head	bc_pa_list;	/* List of preallocated
> diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
> index c47d82622fd1..339e8c077c2d 100644
> --- a/net/sunrpc/backchannel_rqst.c
> +++ b/net/sunrpc/backchannel_rqst.c
> @@ -31,25 +31,20 @@ SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
> #define RPCDBG_FACILITY	RPCDBG_TRANS
> #endif
> 
> +#define BC_MAX_SLOTS	64U
> +
> +unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
> +{
> +	return BC_MAX_SLOTS;
> +}
> +
> /*
>  * Helper routines that track the number of preallocation elements
>  * on the transport.
>  */
> static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
> {
> -	return xprt->bc_alloc_count < atomic_read(&xprt->bc_free_slots);
> -}
> -
> -static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n)
> -{
> -	atomic_add(n, &xprt->bc_free_slots);
> -	xprt->bc_alloc_count += n;
> -}
> -
> -static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n)
> -{
> -	atomic_sub(n, &xprt->bc_free_slots);
> -	return xprt->bc_alloc_count -= n;
> +	return xprt->bc_alloc_count < xprt->bc_alloc_max;
> }
> 
> /*
> @@ -145,6 +140,9 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
> 
> 	dprintk("RPC:       setup backchannel transport\n");
> 
> +	if (min_reqs > BC_MAX_SLOTS)
> +		min_reqs = BC_MAX_SLOTS;
> +
> 	/*
> 	 * We use a temporary list to keep track of the preallocated
> 	 * buffers.  Once we're done building the list we splice it
> @@ -172,7 +170,9 @@ int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
> 	 */
> 	spin_lock(&xprt->bc_pa_lock);
> 	list_splice(&tmp_list, &xprt->bc_pa_list);
> -	xprt_inc_alloc_count(xprt, min_reqs);
> +	xprt->bc_alloc_count += min_reqs;
> +	xprt->bc_alloc_max += min_reqs;
> +	atomic_add(min_reqs, &xprt->bc_slot_count);
> 	spin_unlock(&xprt->bc_pa_lock);
> 
> 	dprintk("RPC:       setup backchannel transport done\n");
> @@ -220,11 +220,13 @@ void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
> 		goto out;
> 
> 	spin_lock_bh(&xprt->bc_pa_lock);
> -	xprt_dec_alloc_count(xprt, max_reqs);
> +	xprt->bc_alloc_max -= max_reqs;
> 	list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
> 		dprintk("RPC:        req=%p\n", req);
> 		list_del(&req->rq_bc_pa_list);
> 		xprt_free_allocation(req);
> +		xprt->bc_alloc_count--;
> +		atomic_dec(&xprt->bc_slot_count);
> 		if (--max_reqs == 0)
> 			break;
> 	}
> @@ -241,13 +243,14 @@ static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
> 	struct rpc_rqst *req = NULL;
> 
> 	dprintk("RPC:       allocate a backchannel request\n");
> -	if (atomic_read(&xprt->bc_free_slots) <= 0)
> -		goto not_found;
> 	if (list_empty(&xprt->bc_pa_list)) {
> 		if (!new)
> 			goto not_found;
> +		if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
> +			goto not_found;
> 		list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
> 		xprt->bc_alloc_count++;
> +		atomic_inc(&xprt->bc_slot_count);
> 	}
> 	req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
> 				rq_bc_pa_list);
> @@ -291,6 +294,7 @@ void xprt_free_bc_rqst(struct rpc_rqst *req)
> 	if (xprt_need_to_requeue(xprt)) {
> 		list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
> 		xprt->bc_alloc_count++;
> +		atomic_inc(&xprt->bc_slot_count);
> 		req = NULL;
> 	}
> 	spin_unlock_bh(&xprt->bc_pa_lock);
> @@ -357,7 +361,7 @@ void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
> 
> 	spin_lock(&xprt->bc_pa_lock);
> 	list_del(&req->rq_bc_pa_list);
> -	xprt_dec_alloc_count(xprt, 1);
> +	xprt->bc_alloc_count--;
> 	spin_unlock(&xprt->bc_pa_lock);
> 
> 	req->rq_private_buf.len = copied;
> diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
> index 383555d2b522..79c849391cb9 100644
> --- a/net/sunrpc/clnt.c
> +++ b/net/sunrpc/clnt.c
> @@ -1526,6 +1526,19 @@ size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
> }
> EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
> 
> +unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
> +{
> +	struct rpc_xprt *xprt;
> +	unsigned int ret;
> +
> +	rcu_read_lock();
> +	xprt = rcu_dereference(clnt->cl_xprt);
> +	ret = xprt->ops->bc_num_slots(xprt);
> +	rcu_read_unlock();
> +	return ret;
> +}
> +EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
> +
> /**
>  * rpc_force_rebind - force transport to check that remote port is unchanged
>  * @clnt: client to rebind
> diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
> index e15cb704453e..220b79988000 100644
> --- a/net/sunrpc/svc.c
> +++ b/net/sunrpc/svc.c
> @@ -1595,7 +1595,7 @@ bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
> 	/* Parse and execute the bc call */
> 	proc_error = svc_process_common(rqstp, argv, resv);
> 
> -	atomic_inc(&req->rq_xprt->bc_free_slots);
> +	atomic_dec(&req->rq_xprt->bc_slot_count);
> 	if (!proc_error) {
> 		/* Processing error: drop the request */
> 		xprt_free_bc_request(req);
> diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
> index ce986591f213..59e624b1d7a0 100644
> --- a/net/sunrpc/xprtrdma/backchannel.c
> +++ b/net/sunrpc/xprtrdma/backchannel.c
> @@ -52,6 +52,13 @@ size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
> 	return maxmsg - RPCRDMA_HDRLEN_MIN;
> }
> 
> +unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
> +{
> +	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> +
> +	return r_xprt->rx_buf.rb_bc_srv_max_requests;
> +}
> +
> static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
> {
> 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index 4993aa49ecbe..52abddac19e5 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -812,6 +812,7 @@ static const struct rpc_xprt_ops xprt_rdma_procs = {
> #if defined(CONFIG_SUNRPC_BACKCHANNEL)
> 	.bc_setup		= xprt_rdma_bc_setup,
> 	.bc_maxpayload		= xprt_rdma_bc_maxpayload,
> +	.bc_num_slots		= xprt_rdma_bc_max_slots,
> 	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
> 	.bc_destroy		= xprt_rdma_bc_destroy,
> #endif
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index 8378f45d2da7..92ce09fcea74 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -605,6 +605,7 @@ void xprt_rdma_cleanup(void);
> #if defined(CONFIG_SUNRPC_BACKCHANNEL)
> int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
> size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *);
> +unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *);
> int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
> void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
> int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst);
> diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
> index 3c2cc96afcaa..6b1fca51028a 100644
> --- a/net/sunrpc/xprtsock.c
> +++ b/net/sunrpc/xprtsock.c
> @@ -2788,6 +2788,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = {
> #ifdef CONFIG_SUNRPC_BACKCHANNEL
> 	.bc_setup		= xprt_setup_bc,
> 	.bc_maxpayload		= xs_tcp_bc_maxpayload,
> +	.bc_num_slots		= xprt_bc_max_slots,
> 	.bc_free_rqst		= xprt_free_bc_rqst,
> 	.bc_destroy		= xprt_destroy_bc,
> #endif
> -- 
> 2.21.0
> 

--
Chuck Lever
Trond Myklebust July 17, 2019, 5:19 p.m. UTC | #2
On Wed, 2019-07-17 at 09:55 -0400, Chuck Lever wrote:
> Hi Trond-
> 
> > On Jul 16, 2019, at 4:01 PM, Trond Myklebust <trondmy@gmail.com>
> > wrote:
> > 
> > Add a per-transport maximum limit in the socket case, and add
> > helpers to allow the NFSv4 code to discover that limit.
> 
> For RDMA, the number of credits is permitted to change during the
> life
> of the connection, so this is not a fixed limit for such transports.

This is defining a maximum value, which is used for backchannel session
slot negotiation.

> 
> And, AFAICT, it's not necessary to know the transport's limit. The
> lesser of the NFS backchannel and RPC/RDMA reverse credit limit will
> be used.

The server needs to know how many requests it can send in parallel on
the back channel. If it sends too many, which it can and will do on
TCP, then we currently break the connection, and so callbacks end up
being dropped or missed altogether.
Chuck Lever July 17, 2019, 6:12 p.m. UTC | #3
> On Jul 17, 2019, at 1:19 PM, Trond Myklebust <trondmy@hammerspace.com> wrote:
> 
> On Wed, 2019-07-17 at 09:55 -0400, Chuck Lever wrote:
>> Hi Trond-
>> 
>>> On Jul 16, 2019, at 4:01 PM, Trond Myklebust <trondmy@gmail.com>
>>> wrote:
>>> 
>>> Add a per-transport maximum limit in the socket case, and add
>>> helpers to allow the NFSv4 code to discover that limit.
>> 
>> For RDMA, the number of credits is permitted to change during the
>> life
>> of the connection, so this is not a fixed limit for such transports.
> 
> This is defining a maximum value, which is used for backchannel session
> slot negotiation.
> 
>> 
>> And, AFAICT, it's not necessary to know the transport's limit. The
>> lesser of the NFS backchannel and RPC/RDMA reverse credit limit will
>> be used.
> 
> The server needs to know how many requests it can send in parallel on
> the back channel. If it sends too many, which it can and will do on
> TCP, then we currently break the connection, and so callbacks end up
> being dropped or missed altogether.

IIUC, RPC/RDMA has a fixed maximum constant. Would you like a patch at
some point that advertises that constant via ->bc_num_slots ?


--
Chuck Lever
Trond Myklebust July 17, 2019, 6:20 p.m. UTC | #4
On Wed, 2019-07-17 at 14:12 -0400, Chuck Lever wrote:
> > On Jul 17, 2019, at 1:19 PM, Trond Myklebust <
> > trondmy@hammerspace.com> wrote:
> > 
> > On Wed, 2019-07-17 at 09:55 -0400, Chuck Lever wrote:
> > > Hi Trond-
> > > 
> > > > On Jul 16, 2019, at 4:01 PM, Trond Myklebust <trondmy@gmail.com
> > > > >
> > > > wrote:
> > > > 
> > > > Add a per-transport maximum limit in the socket case, and add
> > > > helpers to allow the NFSv4 code to discover that limit.
> > > 
> > > For RDMA, the number of credits is permitted to change during the
> > > life
> > > of the connection, so this is not a fixed limit for such
> > > transports.
> > 
> > This is defining a maximum value, which is used for backchannel
> > session
> > slot negotiation.
> > 
> > > And, AFAICT, it's not necessary to know the transport's limit.
> > > The
> > > lesser of the NFS backchannel and RPC/RDMA reverse credit limit
> > > will
> > > be used.
> > 
> > The server needs to know how many requests it can send in parallel
> > on
> > the back channel. If it sends too many, which it can and will do on
> > TCP, then we currently break the connection, and so callbacks end
> > up
> > being dropped or missed altogether.
> 
> IIUC, RPC/RDMA has a fixed maximum constant. Would you like a patch
> at
> some point that advertises that constant via ->bc_num_slots ?
> 

Sure. Thanks!
diff mbox series

Patch

diff --git a/fs/nfs/nfs4proc.c b/fs/nfs/nfs4proc.c
index 52de7245a2ee..39896afc6edf 100644
--- a/fs/nfs/nfs4proc.c
+++ b/fs/nfs/nfs4proc.c
@@ -8380,6 +8380,7 @@  static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args,
 {
 	unsigned int max_rqst_sz, max_resp_sz;
 	unsigned int max_bc_payload = rpc_max_bc_payload(clnt);
+	unsigned int max_bc_slots = rpc_num_bc_slots(clnt);
 
 	max_rqst_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxwrite_overhead;
 	max_resp_sz = NFS_MAX_FILE_IO_SIZE + nfs41_maxread_overhead;
@@ -8402,6 +8403,8 @@  static void nfs4_init_channel_attrs(struct nfs41_create_session_args *args,
 	args->bc_attrs.max_resp_sz_cached = 0;
 	args->bc_attrs.max_ops = NFS4_MAX_BACK_CHANNEL_OPS;
 	args->bc_attrs.max_reqs = max_t(unsigned short, max_session_cb_slots, 1);
+	if (args->bc_attrs.max_reqs > max_bc_slots)
+		args->bc_attrs.max_reqs = max_bc_slots;
 
 	dprintk("%s: Back Channel : max_rqst_sz=%u max_resp_sz=%u "
 		"max_resp_sz_cached=%u max_ops=%u max_reqs=%u\n",
diff --git a/include/linux/sunrpc/bc_xprt.h b/include/linux/sunrpc/bc_xprt.h
index d4229a78524a..87d27e13d885 100644
--- a/include/linux/sunrpc/bc_xprt.h
+++ b/include/linux/sunrpc/bc_xprt.h
@@ -43,6 +43,7 @@  void xprt_destroy_backchannel(struct rpc_xprt *, unsigned int max_reqs);
 int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs);
 void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs);
 void xprt_free_bc_rqst(struct rpc_rqst *req);
+unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt);
 
 /*
  * Determine if a shared backchannel is in use
diff --git a/include/linux/sunrpc/clnt.h b/include/linux/sunrpc/clnt.h
index 4e070e00c143..abc63bd1be2b 100644
--- a/include/linux/sunrpc/clnt.h
+++ b/include/linux/sunrpc/clnt.h
@@ -194,6 +194,7 @@  void		rpc_setbufsize(struct rpc_clnt *, unsigned int, unsigned int);
 struct net *	rpc_net_ns(struct rpc_clnt *);
 size_t		rpc_max_payload(struct rpc_clnt *);
 size_t		rpc_max_bc_payload(struct rpc_clnt *);
+unsigned int	rpc_num_bc_slots(struct rpc_clnt *);
 void		rpc_force_rebind(struct rpc_clnt *);
 size_t		rpc_peeraddr(struct rpc_clnt *, struct sockaddr *, size_t);
 const char	*rpc_peeraddr2str(struct rpc_clnt *, enum rpc_display_format_t);
diff --git a/include/linux/sunrpc/xprt.h b/include/linux/sunrpc/xprt.h
index ed76e5fb36c1..13e108bcc9eb 100644
--- a/include/linux/sunrpc/xprt.h
+++ b/include/linux/sunrpc/xprt.h
@@ -158,6 +158,7 @@  struct rpc_xprt_ops {
 	int		(*bc_setup)(struct rpc_xprt *xprt,
 				    unsigned int min_reqs);
 	size_t		(*bc_maxpayload)(struct rpc_xprt *xprt);
+	unsigned int	(*bc_num_slots)(struct rpc_xprt *xprt);
 	void		(*bc_free_rqst)(struct rpc_rqst *rqst);
 	void		(*bc_destroy)(struct rpc_xprt *xprt,
 				      unsigned int max_reqs);
@@ -251,8 +252,9 @@  struct rpc_xprt {
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	struct svc_serv		*bc_serv;       /* The RPC service which will */
 						/* process the callback */
-	int			bc_alloc_count;	/* Total number of preallocs */
-	atomic_t		bc_free_slots;
+	unsigned int		bc_alloc_max;
+	unsigned int		bc_alloc_count;	/* Total number of preallocs */
+	atomic_t		bc_slot_count;	/* Number of allocated slots */
 	spinlock_t		bc_pa_lock;	/* Protects the preallocated
 						 * items */
 	struct list_head	bc_pa_list;	/* List of preallocated
diff --git a/net/sunrpc/backchannel_rqst.c b/net/sunrpc/backchannel_rqst.c
index c47d82622fd1..339e8c077c2d 100644
--- a/net/sunrpc/backchannel_rqst.c
+++ b/net/sunrpc/backchannel_rqst.c
@@ -31,25 +31,20 @@  SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 #define RPCDBG_FACILITY	RPCDBG_TRANS
 #endif
 
+#define BC_MAX_SLOTS	64U
+
+unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
+{
+	return BC_MAX_SLOTS;
+}
+
 /*
  * Helper routines that track the number of preallocation elements
  * on the transport.
  */
 static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
 {
-	return xprt->bc_alloc_count < atomic_read(&xprt->bc_free_slots);
-}
-
-static inline void xprt_inc_alloc_count(struct rpc_xprt *xprt, unsigned int n)
-{
-	atomic_add(n, &xprt->bc_free_slots);
-	xprt->bc_alloc_count += n;
-}
-
-static inline int xprt_dec_alloc_count(struct rpc_xprt *xprt, unsigned int n)
-{
-	atomic_sub(n, &xprt->bc_free_slots);
-	return xprt->bc_alloc_count -= n;
+	return xprt->bc_alloc_count < xprt->bc_alloc_max;
 }
 
 /*
@@ -145,6 +140,9 @@  int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
 
 	dprintk("RPC:       setup backchannel transport\n");
 
+	if (min_reqs > BC_MAX_SLOTS)
+		min_reqs = BC_MAX_SLOTS;
+
 	/*
 	 * We use a temporary list to keep track of the preallocated
 	 * buffers.  Once we're done building the list we splice it
@@ -172,7 +170,9 @@  int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
 	 */
 	spin_lock(&xprt->bc_pa_lock);
 	list_splice(&tmp_list, &xprt->bc_pa_list);
-	xprt_inc_alloc_count(xprt, min_reqs);
+	xprt->bc_alloc_count += min_reqs;
+	xprt->bc_alloc_max += min_reqs;
+	atomic_add(min_reqs, &xprt->bc_slot_count);
 	spin_unlock(&xprt->bc_pa_lock);
 
 	dprintk("RPC:       setup backchannel transport done\n");
@@ -220,11 +220,13 @@  void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
 		goto out;
 
 	spin_lock_bh(&xprt->bc_pa_lock);
-	xprt_dec_alloc_count(xprt, max_reqs);
+	xprt->bc_alloc_max -= max_reqs;
 	list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
 		dprintk("RPC:        req=%p\n", req);
 		list_del(&req->rq_bc_pa_list);
 		xprt_free_allocation(req);
+		xprt->bc_alloc_count--;
+		atomic_dec(&xprt->bc_slot_count);
 		if (--max_reqs == 0)
 			break;
 	}
@@ -241,13 +243,14 @@  static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
 	struct rpc_rqst *req = NULL;
 
 	dprintk("RPC:       allocate a backchannel request\n");
-	if (atomic_read(&xprt->bc_free_slots) <= 0)
-		goto not_found;
 	if (list_empty(&xprt->bc_pa_list)) {
 		if (!new)
 			goto not_found;
+		if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
+			goto not_found;
 		list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
 		xprt->bc_alloc_count++;
+		atomic_inc(&xprt->bc_slot_count);
 	}
 	req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
 				rq_bc_pa_list);
@@ -291,6 +294,7 @@  void xprt_free_bc_rqst(struct rpc_rqst *req)
 	if (xprt_need_to_requeue(xprt)) {
 		list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
 		xprt->bc_alloc_count++;
+		atomic_inc(&xprt->bc_slot_count);
 		req = NULL;
 	}
 	spin_unlock_bh(&xprt->bc_pa_lock);
@@ -357,7 +361,7 @@  void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
 
 	spin_lock(&xprt->bc_pa_lock);
 	list_del(&req->rq_bc_pa_list);
-	xprt_dec_alloc_count(xprt, 1);
+	xprt->bc_alloc_count--;
 	spin_unlock(&xprt->bc_pa_lock);
 
 	req->rq_private_buf.len = copied;
diff --git a/net/sunrpc/clnt.c b/net/sunrpc/clnt.c
index 383555d2b522..79c849391cb9 100644
--- a/net/sunrpc/clnt.c
+++ b/net/sunrpc/clnt.c
@@ -1526,6 +1526,19 @@  size_t rpc_max_bc_payload(struct rpc_clnt *clnt)
 }
 EXPORT_SYMBOL_GPL(rpc_max_bc_payload);
 
+unsigned int rpc_num_bc_slots(struct rpc_clnt *clnt)
+{
+	struct rpc_xprt *xprt;
+	unsigned int ret;
+
+	rcu_read_lock();
+	xprt = rcu_dereference(clnt->cl_xprt);
+	ret = xprt->ops->bc_num_slots(xprt);
+	rcu_read_unlock();
+	return ret;
+}
+EXPORT_SYMBOL_GPL(rpc_num_bc_slots);
+
 /**
  * rpc_force_rebind - force transport to check that remote port is unchanged
  * @clnt: client to rebind
diff --git a/net/sunrpc/svc.c b/net/sunrpc/svc.c
index e15cb704453e..220b79988000 100644
--- a/net/sunrpc/svc.c
+++ b/net/sunrpc/svc.c
@@ -1595,7 +1595,7 @@  bc_svc_process(struct svc_serv *serv, struct rpc_rqst *req,
 	/* Parse and execute the bc call */
 	proc_error = svc_process_common(rqstp, argv, resv);
 
-	atomic_inc(&req->rq_xprt->bc_free_slots);
+	atomic_dec(&req->rq_xprt->bc_slot_count);
 	if (!proc_error) {
 		/* Processing error: drop the request */
 		xprt_free_bc_request(req);
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
index ce986591f213..59e624b1d7a0 100644
--- a/net/sunrpc/xprtrdma/backchannel.c
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -52,6 +52,13 @@  size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *xprt)
 	return maxmsg - RPCRDMA_HDRLEN_MIN;
 }
 
+unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *xprt)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+
+	return r_xprt->rx_buf.rb_bc_srv_max_requests;
+}
+
 static int rpcrdma_bc_marshal_reply(struct rpc_rqst *rqst)
 {
 	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index 4993aa49ecbe..52abddac19e5 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -812,6 +812,7 @@  static const struct rpc_xprt_ops xprt_rdma_procs = {
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 	.bc_setup		= xprt_rdma_bc_setup,
 	.bc_maxpayload		= xprt_rdma_bc_maxpayload,
+	.bc_num_slots		= xprt_rdma_bc_max_slots,
 	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
 	.bc_destroy		= xprt_rdma_bc_destroy,
 #endif
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index 8378f45d2da7..92ce09fcea74 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -605,6 +605,7 @@  void xprt_rdma_cleanup(void);
 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
 int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
 size_t xprt_rdma_bc_maxpayload(struct rpc_xprt *);
+unsigned int xprt_rdma_bc_max_slots(struct rpc_xprt *);
 int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
 void rpcrdma_bc_receive_call(struct rpcrdma_xprt *, struct rpcrdma_rep *);
 int xprt_rdma_bc_send_reply(struct rpc_rqst *rqst);
diff --git a/net/sunrpc/xprtsock.c b/net/sunrpc/xprtsock.c
index 3c2cc96afcaa..6b1fca51028a 100644
--- a/net/sunrpc/xprtsock.c
+++ b/net/sunrpc/xprtsock.c
@@ -2788,6 +2788,7 @@  static const struct rpc_xprt_ops xs_tcp_ops = {
 #ifdef CONFIG_SUNRPC_BACKCHANNEL
 	.bc_setup		= xprt_setup_bc,
 	.bc_maxpayload		= xs_tcp_bc_maxpayload,
+	.bc_num_slots		= xprt_bc_max_slots,
 	.bc_free_rqst		= xprt_free_bc_rqst,
 	.bc_destroy		= xprt_destroy_bc,
 #endif