Message ID | 20150917204508.19671.23235.stgit@manet.1015granger.net (mailing list archive) |
---|---|
State | Not Applicable |
Headers | show |
Looks good. On Fri, Sep 18, 2015 at 2:15 AM, Chuck Lever <chuck.lever@oracle.com> wrote: > xprtrdma's backward direction send and receive buffers are the same > size as the forechannel's inline threshold, and must be pre- > registered. > > The consumer has no control over which receive buffer the adapter > chooses to catch an incoming backwards-direction call. Any receive > buffer can be used for either a forward reply or a backward call. > Thus both types of RPC message must all be the same size. > > Signed-off-by: Chuck Lever <chuck.lever@oracle.com> > --- > net/sunrpc/xprtrdma/Makefile | 1 > net/sunrpc/xprtrdma/backchannel.c | 204 +++++++++++++++++++++++++++++++++++++ > net/sunrpc/xprtrdma/transport.c | 7 + > net/sunrpc/xprtrdma/verbs.c | 92 ++++++++++++++--- > net/sunrpc/xprtrdma/xprt_rdma.h | 20 ++++ > 5 files changed, 309 insertions(+), 15 deletions(-) > create mode 100644 net/sunrpc/xprtrdma/backchannel.c > > diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile > index 48913de..33f99d3 100644 > --- a/net/sunrpc/xprtrdma/Makefile > +++ b/net/sunrpc/xprtrdma/Makefile > @@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \ > svc_rdma.o svc_rdma_transport.o \ > svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ > module.o > +rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o > diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c > new file mode 100644 > index 0000000..c0a42ad > --- /dev/null > +++ b/net/sunrpc/xprtrdma/backchannel.c > @@ -0,0 +1,204 @@ > +/* > + * Copyright (c) 2015 Oracle. All rights reserved. > + * > + * Support for backward direction RPCs on RPC/RDMA. > + */ > + > +#include <linux/module.h> > + > +#include "xprt_rdma.h" > + > +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) > +# define RPCDBG_FACILITY RPCDBG_TRANS > +#endif > + > +static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, > + struct rpc_rqst *rqst) > +{ > + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; > + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); > + > + spin_lock(&buf->rb_reqslock); > + list_del(&req->rl_all); > + spin_unlock(&buf->rb_reqslock); > + > + rpcrdma_destroy_req(&r_xprt->rx_ia, req); > + > + kfree(rqst); > +} > + > +static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, > + struct rpc_rqst *rqst) > +{ > + struct rpcrdma_ia *ia = &r_xprt->rx_ia; > + struct rpcrdma_regbuf *rb; > + struct rpcrdma_req *req; > + struct xdr_buf *buf; > + size_t size; > + > + req = rpcrdma_create_req(r_xprt); > + if (!req) > + return -ENOMEM; > + req->rl_backchannel = true; > + > + size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); > + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); > + if (IS_ERR(rb)) > + goto out_fail; > + req->rl_rdmabuf = rb; > + > + size += RPCRDMA_INLINE_READ_THRESHOLD(rqst); > + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); > + if (IS_ERR(rb)) > + goto out_fail; > + rb->rg_owner = req; > + req->rl_sendbuf = rb; > + /* so that rpcr_to_rdmar works when receiving a request */ > + rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base; > + > + buf = &rqst->rq_snd_buf; > + buf->head[0].iov_base = rqst->rq_buffer; > + buf->head[0].iov_len = 0; > + buf->tail[0].iov_base = NULL; > + buf->tail[0].iov_len = 0; > + buf->page_len = 0; > + buf->len = 0; > + buf->buflen = size; > + > + return 0; > + > +out_fail: > + rpcrdma_bc_free_rqst(r_xprt, rqst); > + return -ENOMEM; > +} > + > +/* Allocate and add receive buffers to the rpcrdma_buffer's existing > + * list of rep's. These are released when the transport is destroyed. */ > +static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, > + unsigned int count) > +{ > + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; > + struct rpcrdma_rep *rep; > + unsigned long flags; > + int rc = 0; > + > + while (count--) { > + rep = rpcrdma_create_rep(r_xprt); > + if (IS_ERR(rep)) { > + pr_err("RPC: %s: reply buffer alloc failed\n", > + __func__); > + rc = PTR_ERR(rep); > + break; > + } > + > + spin_lock_irqsave(&buffers->rb_lock, flags); > + list_add(&rep->rr_list, &buffers->rb_recv_bufs); > + spin_unlock_irqrestore(&buffers->rb_lock, flags); > + } > + > + return rc; > +} > + > +/** > + * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests > + * @xprt: transport associated with these backchannel resources > + * @reqs: number of concurrent incoming requests to expect > + * > + * Returns 0 on success; otherwise a negative errno > + */ > +int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) > +{ > + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); > + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; > + struct rpc_rqst *rqst; > + unsigned int i; > + int rc; > + > + /* The backchannel reply path returns each rpc_rqst to the > + * bc_pa_list _after_ the reply is sent. If the server is > + * faster than the client, it can send another backward > + * direction request before the rpc_rqst is returned to the > + * list. The client rejects the request in this case. > + * > + * Twice as many rpc_rqsts are prepared to ensure there is > + * always an rpc_rqst available as soon as a reply is sent. > + */ > + for (i = 0; i < (reqs << 1); i++) { > + rqst = kzalloc(sizeof(*rqst), GFP_KERNEL); > + if (!rqst) { > + pr_err("RPC: %s: Failed to create bc rpc_rqst\n", > + __func__); > + goto out_free; > + } > + > + rqst->rq_xprt = &r_xprt->rx_xprt; > + INIT_LIST_HEAD(&rqst->rq_list); > + INIT_LIST_HEAD(&rqst->rq_bc_list); > + > + if (rpcrdma_bc_setup_rqst(r_xprt, rqst)) > + goto out_free; > + > + spin_lock_bh(&xprt->bc_pa_lock); > + list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); > + spin_unlock_bh(&xprt->bc_pa_lock); > + } > + > + rc = rpcrdma_bc_setup_reps(r_xprt, reqs); > + if (rc) > + goto out_free; > + > + rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs); > + if (rc) > + goto out_free; > + > + buffer->rb_bc_srv_max_requests = reqs; > + request_module("svcrdma"); > + > + return 0; > + > +out_free: > + xprt_rdma_bc_destroy(xprt, reqs); > + > + pr_err("RPC: %s: setup backchannel transport failed\n", __func__); > + return -ENOMEM; > +} > + > +/** > + * xprt_rdma_bc_destroy - Release resources for handling backchannel requests > + * @xprt: transport associated with these backchannel resources > + * @reqs: number of incoming requests to destroy; ignored > + */ > +void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) > +{ > + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); > + struct rpc_rqst *rqst, *tmp; > + > + spin_lock_bh(&xprt->bc_pa_lock); > + list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { > + list_del(&rqst->rq_bc_pa_list); > + spin_unlock_bh(&xprt->bc_pa_lock); > + > + rpcrdma_bc_free_rqst(r_xprt, rqst); > + > + spin_lock_bh(&xprt->bc_pa_lock); > + } > + spin_unlock_bh(&xprt->bc_pa_lock); > +} > + > +/** > + * xprt_rdma_bc_free_rqst - Release a backchannel rqst > + * @rqst: request to release > + */ > +void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) > +{ > + struct rpc_xprt *xprt = rqst->rq_xprt; > + > + smp_mb__before_atomic(); > + WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)); > + clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); > + smp_mb__after_atomic(); > + > + spin_lock_bh(&xprt->bc_pa_lock); > + list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); > + spin_unlock_bh(&xprt->bc_pa_lock); > +} > diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c > index e9e5ed7..e3871a6 100644 > --- a/net/sunrpc/xprtrdma/transport.c > +++ b/net/sunrpc/xprtrdma/transport.c > @@ -705,7 +705,12 @@ static struct rpc_xprt_ops xprt_rdma_procs = { > .print_stats = xprt_rdma_print_stats, > .enable_swap = xprt_rdma_enable_swap, > .disable_swap = xprt_rdma_disable_swap, > - .inject_disconnect = xprt_rdma_inject_disconnect > + .inject_disconnect = xprt_rdma_inject_disconnect, > +#if defined(CONFIG_SUNRPC_BACKCHANNEL) > + .bc_setup = xprt_rdma_bc_setup, > + .bc_free_rqst = xprt_rdma_bc_free_rqst, > + .bc_destroy = xprt_rdma_bc_destroy, > +#endif > }; > > static struct xprt_class xprt_rdma = { > diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c > index 8d99214..1e4a948 100644 > --- a/net/sunrpc/xprtrdma/verbs.c > +++ b/net/sunrpc/xprtrdma/verbs.c > @@ -877,7 +877,22 @@ retry: > } > rc = ep->rep_connected; > } else { > + struct rpcrdma_xprt *r_xprt; > + unsigned int extras; > + > dprintk("RPC: %s: connected\n", __func__); > + > + r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); > + extras = r_xprt->rx_buf.rb_bc_srv_max_requests; > + > + if (extras) { > + rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); > + if (rc) > + pr_err("%s: could not post " > + "extra receive buffers: %i\n", > + __func__, rc); > + rc = 0; > + } > } > > out: > @@ -914,20 +929,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) > } > } > > -static struct rpcrdma_req * > +struct rpcrdma_req * > rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) > { > + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; > struct rpcrdma_req *req; > > req = kzalloc(sizeof(*req), GFP_KERNEL); > if (req == NULL) > return ERR_PTR(-ENOMEM); > > + INIT_LIST_HEAD(&req->rl_free); > + spin_lock(&buffer->rb_reqslock); > + list_add(&req->rl_all, &buffer->rb_allreqs); > + spin_unlock(&buffer->rb_reqslock); > req->rl_buffer = &r_xprt->rx_buf; > return req; > } > > -static struct rpcrdma_rep * > +struct rpcrdma_rep * > rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) > { > struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; > @@ -965,6 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) > int i, rc; > > buf->rb_max_requests = r_xprt->rx_data.max_requests; > + buf->rb_bc_srv_max_requests = 0; > spin_lock_init(&buf->rb_lock); > > rc = ia->ri_ops->ro_init(r_xprt); > @@ -972,6 +993,8 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) > goto out; > > INIT_LIST_HEAD(&buf->rb_send_bufs); > + INIT_LIST_HEAD(&buf->rb_allreqs); > + spin_lock_init(&buf->rb_reqslock); > for (i = 0; i < buf->rb_max_requests; i++) { > struct rpcrdma_req *req; > > @@ -982,6 +1005,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) > rc = PTR_ERR(req); > goto out; > } > + req->rl_backchannel = false; > list_add(&req->rl_free, &buf->rb_send_bufs); > } > > @@ -1008,19 +1032,13 @@ out: > static void > rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) > { > - if (!rep) > - return; > - > rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); > kfree(rep); > } > > -static void > +void > rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) > { > - if (!req) > - return; > - > rpcrdma_free_regbuf(ia, req->rl_sendbuf); > rpcrdma_free_regbuf(ia, req->rl_rdmabuf); > kfree(req); > @@ -1040,14 +1058,20 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) > rpcrdma_destroy_rep(ia, rep); > } > > - while (!list_empty(&buf->rb_send_bufs)) { > - struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next, > + spin_lock(&buf->rb_reqslock); > + while (!list_empty(&buf->rb_allreqs)) { > + struct rpcrdma_req *req = list_entry(buf->rb_allreqs.next, > struct rpcrdma_req, > - rl_free); > + rl_all); > + > + list_del(&req->rl_all); > + spin_unlock(&buf->rb_reqslock); > > - list_del(&req->rl_free); > rpcrdma_destroy_req(ia, req); > + > + spin_lock(&buf->rb_reqslock); > } > + spin_unlock(&buf->rb_reqslock); > > ia->ri_ops->ro_destroy(buf); > } > @@ -1094,7 +1118,7 @@ rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf) > > rep = list_first_entry(&buf->rb_recv_bufs, > struct rpcrdma_rep, rr_list); > - list_del(&rep->rr_list); > + list_del_init(&rep->rr_list); > > return rep; > } > @@ -1337,6 +1361,46 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, > return rc; > } > > +/** > + * rpcrdma_bc_post_recv - Post buffers to catch incoming backchannel requests > + * @r_xprt: transport associated with these backchannel resources > + * @min_reqs: minimum number of incoming requests expected > + * > + * Returns zero if all requested buffers were posted, or a negative errno. > + */ > +int > +rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) > +{ > + struct rpcrdma_ia *ia = &r_xprt->rx_ia; > + struct rpcrdma_ep *ep = &r_xprt->rx_ep; > + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; > + struct rpcrdma_rep *rep; > + unsigned long flags; > + int rc; > + > + while (count--) { > + rep = NULL; > + spin_lock_irqsave(&buffers->rb_lock, flags); > + if (!list_empty(&buffers->rb_recv_bufs)) > + rep = rpcrdma_buffer_get_locked(buffers); > + spin_unlock_irqrestore(&buffers->rb_lock, flags); > + if (!rep) { > + pr_err("%s: no extra receive buffers\n", __func__); > + return -ENOMEM; > + } > + > + rc = rpcrdma_ep_post_recv(ia, ep, rep); > + if (rc) { > + spin_lock_irqsave(&buffers->rb_lock, flags); > + rpcrdma_buffer_put_locked(rep, buffers); > + spin_unlock_irqrestore(&buffers->rb_lock, flags); > + return rc; > + } > + } > + > + return 0; > +} > + > /* How many chunk list items fit within our inline buffers? > */ > unsigned int > diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h > index e6a358f..2ca0567 100644 > --- a/net/sunrpc/xprtrdma/xprt_rdma.h > +++ b/net/sunrpc/xprtrdma/xprt_rdma.h > @@ -262,6 +262,9 @@ struct rpcrdma_req { > struct rpcrdma_regbuf *rl_rdmabuf; > struct rpcrdma_regbuf *rl_sendbuf; > struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; > + > + struct list_head rl_all; > + bool rl_backchannel; > }; > > static inline struct rpcrdma_req * > @@ -290,6 +293,10 @@ struct rpcrdma_buffer { > struct list_head rb_send_bufs; > struct list_head rb_recv_bufs; > u32 rb_max_requests; > + > + u32 rb_bc_srv_max_requests; > + spinlock_t rb_reqslock; /* protect rb_allreqs */ > + struct list_head rb_allreqs; > }; > #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) > > @@ -410,6 +417,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *, > /* > * Buffer calls - xprtrdma/verbs.c > */ > +struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *); > +struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *); > +void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *); > int rpcrdma_buffer_create(struct rpcrdma_xprt *); > void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); > > @@ -426,6 +436,7 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *, > struct rpcrdma_regbuf *); > > unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); > +int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); > > int frwr_alloc_recovery_wq(void); > void frwr_destroy_recovery_wq(void); > @@ -490,6 +501,15 @@ int rpcrdma_marshal_req(struct rpc_rqst *); > int xprt_rdma_init(void); > void xprt_rdma_cleanup(void); > > +/* Backchannel calls - xprtrdma/backchannel.c > + */ > +#if defined(CONFIG_SUNRPC_BACKCHANNEL) > +int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); > +int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); > +void xprt_rdma_bc_free_rqst(struct rpc_rqst *); > +void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); > +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ > + > /* Temporary NFS request map cache. Created in svc_rdma.c */ > extern struct kmem_cache *svc_rdma_map_cachep; > /* WR context cache. Created in svc_rdma.c */ > > -- > To unsubscribe from this list: send the line "unsubscribe linux-rdma" in > the body of a message to majordomo@vger.kernel.org > More majordomo info at http://vger.kernel.org/majordomo-info.html -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile index 48913de..33f99d3 100644 --- a/net/sunrpc/xprtrdma/Makefile +++ b/net/sunrpc/xprtrdma/Makefile @@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \ svc_rdma.o svc_rdma_transport.o \ svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ module.o +rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c new file mode 100644 index 0000000..c0a42ad --- /dev/null +++ b/net/sunrpc/xprtrdma/backchannel.c @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2015 Oracle. All rights reserved. + * + * Support for backward direction RPCs on RPC/RDMA. + */ + +#include <linux/module.h> + +#include "xprt_rdma.h" + +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG) +# define RPCDBG_FACILITY RPCDBG_TRANS +#endif + +static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) +{ + struct rpcrdma_buffer *buf = &r_xprt->rx_buf; + struct rpcrdma_req *req = rpcr_to_rdmar(rqst); + + spin_lock(&buf->rb_reqslock); + list_del(&req->rl_all); + spin_unlock(&buf->rb_reqslock); + + rpcrdma_destroy_req(&r_xprt->rx_ia, req); + + kfree(rqst); +} + +static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt, + struct rpc_rqst *rqst) +{ + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_regbuf *rb; + struct rpcrdma_req *req; + struct xdr_buf *buf; + size_t size; + + req = rpcrdma_create_req(r_xprt); + if (!req) + return -ENOMEM; + req->rl_backchannel = true; + + size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst); + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); + if (IS_ERR(rb)) + goto out_fail; + req->rl_rdmabuf = rb; + + size += RPCRDMA_INLINE_READ_THRESHOLD(rqst); + rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL); + if (IS_ERR(rb)) + goto out_fail; + rb->rg_owner = req; + req->rl_sendbuf = rb; + /* so that rpcr_to_rdmar works when receiving a request */ + rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base; + + buf = &rqst->rq_snd_buf; + buf->head[0].iov_base = rqst->rq_buffer; + buf->head[0].iov_len = 0; + buf->tail[0].iov_base = NULL; + buf->tail[0].iov_len = 0; + buf->page_len = 0; + buf->len = 0; + buf->buflen = size; + + return 0; + +out_fail: + rpcrdma_bc_free_rqst(r_xprt, rqst); + return -ENOMEM; +} + +/* Allocate and add receive buffers to the rpcrdma_buffer's existing + * list of rep's. These are released when the transport is destroyed. */ +static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt, + unsigned int count) +{ + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; + struct rpcrdma_rep *rep; + unsigned long flags; + int rc = 0; + + while (count--) { + rep = rpcrdma_create_rep(r_xprt); + if (IS_ERR(rep)) { + pr_err("RPC: %s: reply buffer alloc failed\n", + __func__); + rc = PTR_ERR(rep); + break; + } + + spin_lock_irqsave(&buffers->rb_lock, flags); + list_add(&rep->rr_list, &buffers->rb_recv_bufs); + spin_unlock_irqrestore(&buffers->rb_lock, flags); + } + + return rc; +} + +/** + * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests + * @xprt: transport associated with these backchannel resources + * @reqs: number of concurrent incoming requests to expect + * + * Returns 0 on success; otherwise a negative errno + */ +int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; + struct rpc_rqst *rqst; + unsigned int i; + int rc; + + /* The backchannel reply path returns each rpc_rqst to the + * bc_pa_list _after_ the reply is sent. If the server is + * faster than the client, it can send another backward + * direction request before the rpc_rqst is returned to the + * list. The client rejects the request in this case. + * + * Twice as many rpc_rqsts are prepared to ensure there is + * always an rpc_rqst available as soon as a reply is sent. + */ + for (i = 0; i < (reqs << 1); i++) { + rqst = kzalloc(sizeof(*rqst), GFP_KERNEL); + if (!rqst) { + pr_err("RPC: %s: Failed to create bc rpc_rqst\n", + __func__); + goto out_free; + } + + rqst->rq_xprt = &r_xprt->rx_xprt; + INIT_LIST_HEAD(&rqst->rq_list); + INIT_LIST_HEAD(&rqst->rq_bc_list); + + if (rpcrdma_bc_setup_rqst(r_xprt, rqst)) + goto out_free; + + spin_lock_bh(&xprt->bc_pa_lock); + list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); + spin_unlock_bh(&xprt->bc_pa_lock); + } + + rc = rpcrdma_bc_setup_reps(r_xprt, reqs); + if (rc) + goto out_free; + + rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs); + if (rc) + goto out_free; + + buffer->rb_bc_srv_max_requests = reqs; + request_module("svcrdma"); + + return 0; + +out_free: + xprt_rdma_bc_destroy(xprt, reqs); + + pr_err("RPC: %s: setup backchannel transport failed\n", __func__); + return -ENOMEM; +} + +/** + * xprt_rdma_bc_destroy - Release resources for handling backchannel requests + * @xprt: transport associated with these backchannel resources + * @reqs: number of incoming requests to destroy; ignored + */ +void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs) +{ + struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt); + struct rpc_rqst *rqst, *tmp; + + spin_lock_bh(&xprt->bc_pa_lock); + list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) { + list_del(&rqst->rq_bc_pa_list); + spin_unlock_bh(&xprt->bc_pa_lock); + + rpcrdma_bc_free_rqst(r_xprt, rqst); + + spin_lock_bh(&xprt->bc_pa_lock); + } + spin_unlock_bh(&xprt->bc_pa_lock); +} + +/** + * xprt_rdma_bc_free_rqst - Release a backchannel rqst + * @rqst: request to release + */ +void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst) +{ + struct rpc_xprt *xprt = rqst->rq_xprt; + + smp_mb__before_atomic(); + WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state)); + clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state); + smp_mb__after_atomic(); + + spin_lock_bh(&xprt->bc_pa_lock); + list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list); + spin_unlock_bh(&xprt->bc_pa_lock); +} diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c index e9e5ed7..e3871a6 100644 --- a/net/sunrpc/xprtrdma/transport.c +++ b/net/sunrpc/xprtrdma/transport.c @@ -705,7 +705,12 @@ static struct rpc_xprt_ops xprt_rdma_procs = { .print_stats = xprt_rdma_print_stats, .enable_swap = xprt_rdma_enable_swap, .disable_swap = xprt_rdma_disable_swap, - .inject_disconnect = xprt_rdma_inject_disconnect + .inject_disconnect = xprt_rdma_inject_disconnect, +#if defined(CONFIG_SUNRPC_BACKCHANNEL) + .bc_setup = xprt_rdma_bc_setup, + .bc_free_rqst = xprt_rdma_bc_free_rqst, + .bc_destroy = xprt_rdma_bc_destroy, +#endif }; static struct xprt_class xprt_rdma = { diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c index 8d99214..1e4a948 100644 --- a/net/sunrpc/xprtrdma/verbs.c +++ b/net/sunrpc/xprtrdma/verbs.c @@ -877,7 +877,22 @@ retry: } rc = ep->rep_connected; } else { + struct rpcrdma_xprt *r_xprt; + unsigned int extras; + dprintk("RPC: %s: connected\n", __func__); + + r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia); + extras = r_xprt->rx_buf.rb_bc_srv_max_requests; + + if (extras) { + rc = rpcrdma_ep_post_extra_recv(r_xprt, extras); + if (rc) + pr_err("%s: could not post " + "extra receive buffers: %i\n", + __func__, rc); + rc = 0; + } } out: @@ -914,20 +929,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia) } } -static struct rpcrdma_req * +struct rpcrdma_req * rpcrdma_create_req(struct rpcrdma_xprt *r_xprt) { + struct rpcrdma_buffer *buffer = &r_xprt->rx_buf; struct rpcrdma_req *req; req = kzalloc(sizeof(*req), GFP_KERNEL); if (req == NULL) return ERR_PTR(-ENOMEM); + INIT_LIST_HEAD(&req->rl_free); + spin_lock(&buffer->rb_reqslock); + list_add(&req->rl_all, &buffer->rb_allreqs); + spin_unlock(&buffer->rb_reqslock); req->rl_buffer = &r_xprt->rx_buf; return req; } -static struct rpcrdma_rep * +struct rpcrdma_rep * rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt) { struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data; @@ -965,6 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) int i, rc; buf->rb_max_requests = r_xprt->rx_data.max_requests; + buf->rb_bc_srv_max_requests = 0; spin_lock_init(&buf->rb_lock); rc = ia->ri_ops->ro_init(r_xprt); @@ -972,6 +993,8 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) goto out; INIT_LIST_HEAD(&buf->rb_send_bufs); + INIT_LIST_HEAD(&buf->rb_allreqs); + spin_lock_init(&buf->rb_reqslock); for (i = 0; i < buf->rb_max_requests; i++) { struct rpcrdma_req *req; @@ -982,6 +1005,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt) rc = PTR_ERR(req); goto out; } + req->rl_backchannel = false; list_add(&req->rl_free, &buf->rb_send_bufs); } @@ -1008,19 +1032,13 @@ out: static void rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep) { - if (!rep) - return; - rpcrdma_free_regbuf(ia, rep->rr_rdmabuf); kfree(rep); } -static void +void rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req) { - if (!req) - return; - rpcrdma_free_regbuf(ia, req->rl_sendbuf); rpcrdma_free_regbuf(ia, req->rl_rdmabuf); kfree(req); @@ -1040,14 +1058,20 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf) rpcrdma_destroy_rep(ia, rep); } - while (!list_empty(&buf->rb_send_bufs)) { - struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next, + spin_lock(&buf->rb_reqslock); + while (!list_empty(&buf->rb_allreqs)) { + struct rpcrdma_req *req = list_entry(buf->rb_allreqs.next, struct rpcrdma_req, - rl_free); + rl_all); + + list_del(&req->rl_all); + spin_unlock(&buf->rb_reqslock); - list_del(&req->rl_free); rpcrdma_destroy_req(ia, req); + + spin_lock(&buf->rb_reqslock); } + spin_unlock(&buf->rb_reqslock); ia->ri_ops->ro_destroy(buf); } @@ -1094,7 +1118,7 @@ rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf) rep = list_first_entry(&buf->rb_recv_bufs, struct rpcrdma_rep, rr_list); - list_del(&rep->rr_list); + list_del_init(&rep->rr_list); return rep; } @@ -1337,6 +1361,46 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia, return rc; } +/** + * rpcrdma_bc_post_recv - Post buffers to catch incoming backchannel requests + * @r_xprt: transport associated with these backchannel resources + * @min_reqs: minimum number of incoming requests expected + * + * Returns zero if all requested buffers were posted, or a negative errno. + */ +int +rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count) +{ + struct rpcrdma_ia *ia = &r_xprt->rx_ia; + struct rpcrdma_ep *ep = &r_xprt->rx_ep; + struct rpcrdma_buffer *buffers = &r_xprt->rx_buf; + struct rpcrdma_rep *rep; + unsigned long flags; + int rc; + + while (count--) { + rep = NULL; + spin_lock_irqsave(&buffers->rb_lock, flags); + if (!list_empty(&buffers->rb_recv_bufs)) + rep = rpcrdma_buffer_get_locked(buffers); + spin_unlock_irqrestore(&buffers->rb_lock, flags); + if (!rep) { + pr_err("%s: no extra receive buffers\n", __func__); + return -ENOMEM; + } + + rc = rpcrdma_ep_post_recv(ia, ep, rep); + if (rc) { + spin_lock_irqsave(&buffers->rb_lock, flags); + rpcrdma_buffer_put_locked(rep, buffers); + spin_unlock_irqrestore(&buffers->rb_lock, flags); + return rc; + } + } + + return 0; +} + /* How many chunk list items fit within our inline buffers? */ unsigned int diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h index e6a358f..2ca0567 100644 --- a/net/sunrpc/xprtrdma/xprt_rdma.h +++ b/net/sunrpc/xprtrdma/xprt_rdma.h @@ -262,6 +262,9 @@ struct rpcrdma_req { struct rpcrdma_regbuf *rl_rdmabuf; struct rpcrdma_regbuf *rl_sendbuf; struct rpcrdma_mr_seg rl_segments[RPCRDMA_MAX_SEGS]; + + struct list_head rl_all; + bool rl_backchannel; }; static inline struct rpcrdma_req * @@ -290,6 +293,10 @@ struct rpcrdma_buffer { struct list_head rb_send_bufs; struct list_head rb_recv_bufs; u32 rb_max_requests; + + u32 rb_bc_srv_max_requests; + spinlock_t rb_reqslock; /* protect rb_allreqs */ + struct list_head rb_allreqs; }; #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia) @@ -410,6 +417,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *, /* * Buffer calls - xprtrdma/verbs.c */ +struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *); +struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *); +void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *); int rpcrdma_buffer_create(struct rpcrdma_xprt *); void rpcrdma_buffer_destroy(struct rpcrdma_buffer *); @@ -426,6 +436,7 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *, struct rpcrdma_regbuf *); unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *); +int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int); int frwr_alloc_recovery_wq(void); void frwr_destroy_recovery_wq(void); @@ -490,6 +501,15 @@ int rpcrdma_marshal_req(struct rpc_rqst *); int xprt_rdma_init(void); void xprt_rdma_cleanup(void); +/* Backchannel calls - xprtrdma/backchannel.c + */ +#if defined(CONFIG_SUNRPC_BACKCHANNEL) +int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int); +int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int); +void xprt_rdma_bc_free_rqst(struct rpc_rqst *); +void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int); +#endif /* CONFIG_SUNRPC_BACKCHANNEL */ + /* Temporary NFS request map cache. Created in svc_rdma.c */ extern struct kmem_cache *svc_rdma_map_cachep; /* WR context cache. Created in svc_rdma.c */
xprtrdma's backward direction send and receive buffers are the same size as the forechannel's inline threshold, and must be pre- registered. The consumer has no control over which receive buffer the adapter chooses to catch an incoming backwards-direction call. Any receive buffer can be used for either a forward reply or a backward call. Thus both types of RPC message must all be the same size. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> --- net/sunrpc/xprtrdma/Makefile | 1 net/sunrpc/xprtrdma/backchannel.c | 204 +++++++++++++++++++++++++++++++++++++ net/sunrpc/xprtrdma/transport.c | 7 + net/sunrpc/xprtrdma/verbs.c | 92 ++++++++++++++--- net/sunrpc/xprtrdma/xprt_rdma.h | 20 ++++ 5 files changed, 309 insertions(+), 15 deletions(-) create mode 100644 net/sunrpc/xprtrdma/backchannel.c -- To unsubscribe from this list: send the line "unsubscribe linux-rdma" in the body of a message to majordomo@vger.kernel.org More majordomo info at http://vger.kernel.org/majordomo-info.html