diff mbox

[v1,07/18] xprtrdma: Pre-allocate backward rpc_rqst and send/receive buffers

Message ID 20150917204508.19671.23235.stgit@manet.1015granger.net (mailing list archive)
State New, archived
Headers show

Commit Message

Chuck Lever Sept. 17, 2015, 8:45 p.m. UTC
xprtrdma's backward direction send and receive buffers are the same
size as the forechannel's inline threshold, and must be pre-
registered.

The consumer has no control over which receive buffer the adapter
chooses to catch an incoming backwards-direction call. Any receive
buffer can be used for either a forward reply or a backward call.
Thus both types of RPC message must all be the same size.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 net/sunrpc/xprtrdma/Makefile      |    1 
 net/sunrpc/xprtrdma/backchannel.c |  204 +++++++++++++++++++++++++++++++++++++
 net/sunrpc/xprtrdma/transport.c   |    7 +
 net/sunrpc/xprtrdma/verbs.c       |   92 ++++++++++++++---
 net/sunrpc/xprtrdma/xprt_rdma.h   |   20 ++++
 5 files changed, 309 insertions(+), 15 deletions(-)
 create mode 100644 net/sunrpc/xprtrdma/backchannel.c


--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

Devesh Sharma Sept. 21, 2015, 10:28 a.m. UTC | #1
Looks good.

On Fri, Sep 18, 2015 at 2:15 AM, Chuck Lever <chuck.lever@oracle.com> wrote:
> xprtrdma's backward direction send and receive buffers are the same
> size as the forechannel's inline threshold, and must be pre-
> registered.
>
> The consumer has no control over which receive buffer the adapter
> chooses to catch an incoming backwards-direction call. Any receive
> buffer can be used for either a forward reply or a backward call.
> Thus both types of RPC message must all be the same size.
>
> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
> ---
>  net/sunrpc/xprtrdma/Makefile      |    1
>  net/sunrpc/xprtrdma/backchannel.c |  204 +++++++++++++++++++++++++++++++++++++
>  net/sunrpc/xprtrdma/transport.c   |    7 +
>  net/sunrpc/xprtrdma/verbs.c       |   92 ++++++++++++++---
>  net/sunrpc/xprtrdma/xprt_rdma.h   |   20 ++++
>  5 files changed, 309 insertions(+), 15 deletions(-)
>  create mode 100644 net/sunrpc/xprtrdma/backchannel.c
>
> diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
> index 48913de..33f99d3 100644
> --- a/net/sunrpc/xprtrdma/Makefile
> +++ b/net/sunrpc/xprtrdma/Makefile
> @@ -5,3 +5,4 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
>         svc_rdma.o svc_rdma_transport.o \
>         svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
>         module.o
> +rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
> diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
> new file mode 100644
> index 0000000..c0a42ad
> --- /dev/null
> +++ b/net/sunrpc/xprtrdma/backchannel.c
> @@ -0,0 +1,204 @@
> +/*
> + * Copyright (c) 2015 Oracle.  All rights reserved.
> + *
> + * Support for backward direction RPCs on RPC/RDMA.
> + */
> +
> +#include <linux/module.h>
> +
> +#include "xprt_rdma.h"
> +
> +#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
> +# define RPCDBG_FACILITY       RPCDBG_TRANS
> +#endif
> +
> +static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
> +                                struct rpc_rqst *rqst)
> +{
> +       struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
> +       struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
> +
> +       spin_lock(&buf->rb_reqslock);
> +       list_del(&req->rl_all);
> +       spin_unlock(&buf->rb_reqslock);
> +
> +       rpcrdma_destroy_req(&r_xprt->rx_ia, req);
> +
> +       kfree(rqst);
> +}
> +
> +static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
> +                                struct rpc_rqst *rqst)
> +{
> +       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
> +       struct rpcrdma_regbuf *rb;
> +       struct rpcrdma_req *req;
> +       struct xdr_buf *buf;
> +       size_t size;
> +
> +       req = rpcrdma_create_req(r_xprt);
> +       if (!req)
> +               return -ENOMEM;
> +       req->rl_backchannel = true;
> +
> +       size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
> +       rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
> +       if (IS_ERR(rb))
> +               goto out_fail;
> +       req->rl_rdmabuf = rb;
> +
> +       size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
> +       rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
> +       if (IS_ERR(rb))
> +               goto out_fail;
> +       rb->rg_owner = req;
> +       req->rl_sendbuf = rb;
> +       /* so that rpcr_to_rdmar works when receiving a request */
> +       rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
> +
> +       buf = &rqst->rq_snd_buf;
> +       buf->head[0].iov_base = rqst->rq_buffer;
> +       buf->head[0].iov_len = 0;
> +       buf->tail[0].iov_base = NULL;
> +       buf->tail[0].iov_len = 0;
> +       buf->page_len = 0;
> +       buf->len = 0;
> +       buf->buflen = size;
> +
> +       return 0;
> +
> +out_fail:
> +       rpcrdma_bc_free_rqst(r_xprt, rqst);
> +       return -ENOMEM;
> +}
> +
> +/* Allocate and add receive buffers to the rpcrdma_buffer's existing
> + * list of rep's. These are released when the transport is destroyed. */
> +static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
> +                                unsigned int count)
> +{
> +       struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
> +       struct rpcrdma_rep *rep;
> +       unsigned long flags;
> +       int rc = 0;
> +
> +       while (count--) {
> +               rep = rpcrdma_create_rep(r_xprt);
> +               if (IS_ERR(rep)) {
> +                       pr_err("RPC:       %s: reply buffer alloc failed\n",
> +                              __func__);
> +                       rc = PTR_ERR(rep);
> +                       break;
> +               }
> +
> +               spin_lock_irqsave(&buffers->rb_lock, flags);
> +               list_add(&rep->rr_list, &buffers->rb_recv_bufs);
> +               spin_unlock_irqrestore(&buffers->rb_lock, flags);
> +       }
> +
> +       return rc;
> +}
> +
> +/**
> + * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
> + * @xprt: transport associated with these backchannel resources
> + * @reqs: number of concurrent incoming requests to expect
> + *
> + * Returns 0 on success; otherwise a negative errno
> + */
> +int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
> +{
> +       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> +       struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
> +       struct rpc_rqst *rqst;
> +       unsigned int i;
> +       int rc;
> +
> +       /* The backchannel reply path returns each rpc_rqst to the
> +        * bc_pa_list _after_ the reply is sent. If the server is
> +        * faster than the client, it can send another backward
> +        * direction request before the rpc_rqst is returned to the
> +        * list. The client rejects the request in this case.
> +        *
> +        * Twice as many rpc_rqsts are prepared to ensure there is
> +        * always an rpc_rqst available as soon as a reply is sent.
> +        */
> +       for (i = 0; i < (reqs << 1); i++) {
> +               rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
> +               if (!rqst) {
> +                       pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
> +                              __func__);
> +                       goto out_free;
> +               }
> +
> +               rqst->rq_xprt = &r_xprt->rx_xprt;
> +               INIT_LIST_HEAD(&rqst->rq_list);
> +               INIT_LIST_HEAD(&rqst->rq_bc_list);
> +
> +               if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
> +                       goto out_free;
> +
> +               spin_lock_bh(&xprt->bc_pa_lock);
> +               list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
> +               spin_unlock_bh(&xprt->bc_pa_lock);
> +       }
> +
> +       rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
> +       if (rc)
> +               goto out_free;
> +
> +       rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
> +       if (rc)
> +               goto out_free;
> +
> +       buffer->rb_bc_srv_max_requests = reqs;
> +       request_module("svcrdma");
> +
> +       return 0;
> +
> +out_free:
> +       xprt_rdma_bc_destroy(xprt, reqs);
> +
> +       pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
> +       return -ENOMEM;
> +}
> +
> +/**
> + * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
> + * @xprt: transport associated with these backchannel resources
> + * @reqs: number of incoming requests to destroy; ignored
> + */
> +void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
> +{
> +       struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
> +       struct rpc_rqst *rqst, *tmp;
> +
> +       spin_lock_bh(&xprt->bc_pa_lock);
> +       list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
> +               list_del(&rqst->rq_bc_pa_list);
> +               spin_unlock_bh(&xprt->bc_pa_lock);
> +
> +               rpcrdma_bc_free_rqst(r_xprt, rqst);
> +
> +               spin_lock_bh(&xprt->bc_pa_lock);
> +       }
> +       spin_unlock_bh(&xprt->bc_pa_lock);
> +}
> +
> +/**
> + * xprt_rdma_bc_free_rqst - Release a backchannel rqst
> + * @rqst: request to release
> + */
> +void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
> +{
> +       struct rpc_xprt *xprt = rqst->rq_xprt;
> +
> +       smp_mb__before_atomic();
> +       WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
> +       clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
> +       smp_mb__after_atomic();
> +
> +       spin_lock_bh(&xprt->bc_pa_lock);
> +       list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
> +       spin_unlock_bh(&xprt->bc_pa_lock);
> +}
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index e9e5ed7..e3871a6 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -705,7 +705,12 @@ static struct rpc_xprt_ops xprt_rdma_procs = {
>         .print_stats            = xprt_rdma_print_stats,
>         .enable_swap            = xprt_rdma_enable_swap,
>         .disable_swap           = xprt_rdma_disable_swap,
> -       .inject_disconnect      = xprt_rdma_inject_disconnect
> +       .inject_disconnect      = xprt_rdma_inject_disconnect,
> +#if defined(CONFIG_SUNRPC_BACKCHANNEL)
> +       .bc_setup               = xprt_rdma_bc_setup,
> +       .bc_free_rqst           = xprt_rdma_bc_free_rqst,
> +       .bc_destroy             = xprt_rdma_bc_destroy,
> +#endif
>  };
>
>  static struct xprt_class xprt_rdma = {
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index 8d99214..1e4a948 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -877,7 +877,22 @@ retry:
>                 }
>                 rc = ep->rep_connected;
>         } else {
> +               struct rpcrdma_xprt *r_xprt;
> +               unsigned int extras;
> +
>                 dprintk("RPC:       %s: connected\n", __func__);
> +
> +               r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
> +               extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
> +
> +               if (extras) {
> +                       rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
> +                       if (rc)
> +                               pr_err("%s: could not post "
> +                                      "extra receive buffers: %i\n",
> +                                      __func__, rc);
> +                               rc = 0;
> +               }
>         }
>
>  out:
> @@ -914,20 +929,25 @@ rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
>         }
>  }
>
> -static struct rpcrdma_req *
> +struct rpcrdma_req *
>  rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
>  {
> +       struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
>         struct rpcrdma_req *req;
>
>         req = kzalloc(sizeof(*req), GFP_KERNEL);
>         if (req == NULL)
>                 return ERR_PTR(-ENOMEM);
>
> +       INIT_LIST_HEAD(&req->rl_free);
> +       spin_lock(&buffer->rb_reqslock);
> +       list_add(&req->rl_all, &buffer->rb_allreqs);
> +       spin_unlock(&buffer->rb_reqslock);
>         req->rl_buffer = &r_xprt->rx_buf;
>         return req;
>  }
>
> -static struct rpcrdma_rep *
> +struct rpcrdma_rep *
>  rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
>  {
>         struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
> @@ -965,6 +985,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>         int i, rc;
>
>         buf->rb_max_requests = r_xprt->rx_data.max_requests;
> +       buf->rb_bc_srv_max_requests = 0;
>         spin_lock_init(&buf->rb_lock);
>
>         rc = ia->ri_ops->ro_init(r_xprt);
> @@ -972,6 +993,8 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>                 goto out;
>
>         INIT_LIST_HEAD(&buf->rb_send_bufs);
> +       INIT_LIST_HEAD(&buf->rb_allreqs);
> +       spin_lock_init(&buf->rb_reqslock);
>         for (i = 0; i < buf->rb_max_requests; i++) {
>                 struct rpcrdma_req *req;
>
> @@ -982,6 +1005,7 @@ rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
>                         rc = PTR_ERR(req);
>                         goto out;
>                 }
> +               req->rl_backchannel = false;
>                 list_add(&req->rl_free, &buf->rb_send_bufs);
>         }
>
> @@ -1008,19 +1032,13 @@ out:
>  static void
>  rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
>  {
> -       if (!rep)
> -               return;
> -
>         rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
>         kfree(rep);
>  }
>
> -static void
> +void
>  rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
>  {
> -       if (!req)
> -               return;
> -
>         rpcrdma_free_regbuf(ia, req->rl_sendbuf);
>         rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
>         kfree(req);
> @@ -1040,14 +1058,20 @@ rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
>                 rpcrdma_destroy_rep(ia, rep);
>         }
>
> -       while (!list_empty(&buf->rb_send_bufs)) {
> -               struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next,
> +       spin_lock(&buf->rb_reqslock);
> +       while (!list_empty(&buf->rb_allreqs)) {
> +               struct rpcrdma_req *req = list_entry(buf->rb_allreqs.next,
>                                                      struct rpcrdma_req,
> -                                                    rl_free);
> +                                                    rl_all);
> +
> +               list_del(&req->rl_all);
> +               spin_unlock(&buf->rb_reqslock);
>
> -               list_del(&req->rl_free);
>                 rpcrdma_destroy_req(ia, req);
> +
> +               spin_lock(&buf->rb_reqslock);
>         }
> +       spin_unlock(&buf->rb_reqslock);
>
>         ia->ri_ops->ro_destroy(buf);
>  }
> @@ -1094,7 +1118,7 @@ rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf)
>
>         rep = list_first_entry(&buf->rb_recv_bufs,
>                                struct rpcrdma_rep, rr_list);
> -       list_del(&rep->rr_list);
> +       list_del_init(&rep->rr_list);
>
>         return rep;
>  }
> @@ -1337,6 +1361,46 @@ rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
>         return rc;
>  }
>
> +/**
> + * rpcrdma_bc_post_recv - Post buffers to catch incoming backchannel requests
> + * @r_xprt: transport associated with these backchannel resources
> + * @min_reqs: minimum number of incoming requests expected
> + *
> + * Returns zero if all requested buffers were posted, or a negative errno.
> + */
> +int
> +rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
> +{
> +       struct rpcrdma_ia *ia = &r_xprt->rx_ia;
> +       struct rpcrdma_ep *ep = &r_xprt->rx_ep;
> +       struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
> +       struct rpcrdma_rep *rep;
> +       unsigned long flags;
> +       int rc;
> +
> +       while (count--) {
> +               rep = NULL;
> +               spin_lock_irqsave(&buffers->rb_lock, flags);
> +               if (!list_empty(&buffers->rb_recv_bufs))
> +                       rep = rpcrdma_buffer_get_locked(buffers);
> +               spin_unlock_irqrestore(&buffers->rb_lock, flags);
> +               if (!rep) {
> +                       pr_err("%s: no extra receive buffers\n", __func__);
> +                       return -ENOMEM;
> +               }
> +
> +               rc = rpcrdma_ep_post_recv(ia, ep, rep);
> +               if (rc) {
> +                       spin_lock_irqsave(&buffers->rb_lock, flags);
> +                       rpcrdma_buffer_put_locked(rep, buffers);
> +                       spin_unlock_irqrestore(&buffers->rb_lock, flags);
> +                       return rc;
> +               }
> +       }
> +
> +       return 0;
> +}
> +
>  /* How many chunk list items fit within our inline buffers?
>   */
>  unsigned int
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index e6a358f..2ca0567 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -262,6 +262,9 @@ struct rpcrdma_req {
>         struct rpcrdma_regbuf   *rl_rdmabuf;
>         struct rpcrdma_regbuf   *rl_sendbuf;
>         struct rpcrdma_mr_seg   rl_segments[RPCRDMA_MAX_SEGS];
> +
> +       struct list_head        rl_all;
> +       bool                    rl_backchannel;
>  };
>
>  static inline struct rpcrdma_req *
> @@ -290,6 +293,10 @@ struct rpcrdma_buffer {
>         struct list_head        rb_send_bufs;
>         struct list_head        rb_recv_bufs;
>         u32                     rb_max_requests;
> +
> +       u32                     rb_bc_srv_max_requests;
> +       spinlock_t              rb_reqslock;    /* protect rb_allreqs */
> +       struct list_head        rb_allreqs;
>  };
>  #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
>
> @@ -410,6 +417,9 @@ int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
>  /*
>   * Buffer calls - xprtrdma/verbs.c
>   */
> +struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
> +struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
> +void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
>  int rpcrdma_buffer_create(struct rpcrdma_xprt *);
>  void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
>
> @@ -426,6 +436,7 @@ void rpcrdma_free_regbuf(struct rpcrdma_ia *,
>                          struct rpcrdma_regbuf *);
>
>  unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
> +int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
>
>  int frwr_alloc_recovery_wq(void);
>  void frwr_destroy_recovery_wq(void);
> @@ -490,6 +501,15 @@ int rpcrdma_marshal_req(struct rpc_rqst *);
>  int xprt_rdma_init(void);
>  void xprt_rdma_cleanup(void);
>
> +/* Backchannel calls - xprtrdma/backchannel.c
> + */
> +#if defined(CONFIG_SUNRPC_BACKCHANNEL)
> +int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
> +int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
> +void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
> +void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
> +#endif /* CONFIG_SUNRPC_BACKCHANNEL */
> +
>  /* Temporary NFS request map cache. Created in svc_rdma.c  */
>  extern struct kmem_cache *svc_rdma_map_cachep;
>  /* WR context cache. Created in svc_rdma.c  */
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
--
To unsubscribe from this list: send the line "unsubscribe linux-nfs" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index 48913de..33f99d3 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -5,3 +5,4 @@  rpcrdma-y := transport.o rpc_rdma.o verbs.o \
 	svc_rdma.o svc_rdma_transport.o \
 	svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
 	module.o
+rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/backchannel.c b/net/sunrpc/xprtrdma/backchannel.c
new file mode 100644
index 0000000..c0a42ad
--- /dev/null
+++ b/net/sunrpc/xprtrdma/backchannel.c
@@ -0,0 +1,204 @@ 
+/*
+ * Copyright (c) 2015 Oracle.  All rights reserved.
+ *
+ * Support for backward direction RPCs on RPC/RDMA.
+ */
+
+#include <linux/module.h>
+
+#include "xprt_rdma.h"
+
+#if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
+# define RPCDBG_FACILITY	RPCDBG_TRANS
+#endif
+
+static void rpcrdma_bc_free_rqst(struct rpcrdma_xprt *r_xprt,
+				 struct rpc_rqst *rqst)
+{
+	struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
+	struct rpcrdma_req *req = rpcr_to_rdmar(rqst);
+
+	spin_lock(&buf->rb_reqslock);
+	list_del(&req->rl_all);
+	spin_unlock(&buf->rb_reqslock);
+
+	rpcrdma_destroy_req(&r_xprt->rx_ia, req);
+
+	kfree(rqst);
+}
+
+static int rpcrdma_bc_setup_rqst(struct rpcrdma_xprt *r_xprt,
+				 struct rpc_rqst *rqst)
+{
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_regbuf *rb;
+	struct rpcrdma_req *req;
+	struct xdr_buf *buf;
+	size_t size;
+
+	req = rpcrdma_create_req(r_xprt);
+	if (!req)
+		return -ENOMEM;
+	req->rl_backchannel = true;
+
+	size = RPCRDMA_INLINE_WRITE_THRESHOLD(rqst);
+	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+	if (IS_ERR(rb))
+		goto out_fail;
+	req->rl_rdmabuf = rb;
+
+	size += RPCRDMA_INLINE_READ_THRESHOLD(rqst);
+	rb = rpcrdma_alloc_regbuf(ia, size, GFP_KERNEL);
+	if (IS_ERR(rb))
+		goto out_fail;
+	rb->rg_owner = req;
+	req->rl_sendbuf = rb;
+	/* so that rpcr_to_rdmar works when receiving a request */
+	rqst->rq_buffer = (void *)req->rl_sendbuf->rg_base;
+
+	buf = &rqst->rq_snd_buf;
+	buf->head[0].iov_base = rqst->rq_buffer;
+	buf->head[0].iov_len = 0;
+	buf->tail[0].iov_base = NULL;
+	buf->tail[0].iov_len = 0;
+	buf->page_len = 0;
+	buf->len = 0;
+	buf->buflen = size;
+
+	return 0;
+
+out_fail:
+	rpcrdma_bc_free_rqst(r_xprt, rqst);
+	return -ENOMEM;
+}
+
+/* Allocate and add receive buffers to the rpcrdma_buffer's existing
+ * list of rep's. These are released when the transport is destroyed. */
+static int rpcrdma_bc_setup_reps(struct rpcrdma_xprt *r_xprt,
+				 unsigned int count)
+{
+	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+	struct rpcrdma_rep *rep;
+	unsigned long flags;
+	int rc = 0;
+
+	while (count--) {
+		rep = rpcrdma_create_rep(r_xprt);
+		if (IS_ERR(rep)) {
+			pr_err("RPC:       %s: reply buffer alloc failed\n",
+			       __func__);
+			rc = PTR_ERR(rep);
+			break;
+		}
+
+		spin_lock_irqsave(&buffers->rb_lock, flags);
+		list_add(&rep->rr_list, &buffers->rb_recv_bufs);
+		spin_unlock_irqrestore(&buffers->rb_lock, flags);
+	}
+
+	return rc;
+}
+
+/**
+ * xprt_rdma_bc_setup - Pre-allocate resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of concurrent incoming requests to expect
+ *
+ * Returns 0 on success; otherwise a negative errno
+ */
+int xprt_rdma_bc_setup(struct rpc_xprt *xprt, unsigned int reqs)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
+	struct rpc_rqst *rqst;
+	unsigned int i;
+	int rc;
+
+	/* The backchannel reply path returns each rpc_rqst to the
+	 * bc_pa_list _after_ the reply is sent. If the server is
+	 * faster than the client, it can send another backward
+	 * direction request before the rpc_rqst is returned to the
+	 * list. The client rejects the request in this case.
+	 *
+	 * Twice as many rpc_rqsts are prepared to ensure there is
+	 * always an rpc_rqst available as soon as a reply is sent.
+	 */
+	for (i = 0; i < (reqs << 1); i++) {
+		rqst = kzalloc(sizeof(*rqst), GFP_KERNEL);
+		if (!rqst) {
+			pr_err("RPC:       %s: Failed to create bc rpc_rqst\n",
+			       __func__);
+			goto out_free;
+		}
+
+		rqst->rq_xprt = &r_xprt->rx_xprt;
+		INIT_LIST_HEAD(&rqst->rq_list);
+		INIT_LIST_HEAD(&rqst->rq_bc_list);
+
+		if (rpcrdma_bc_setup_rqst(r_xprt, rqst))
+			goto out_free;
+
+		spin_lock_bh(&xprt->bc_pa_lock);
+		list_add(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+		spin_unlock_bh(&xprt->bc_pa_lock);
+	}
+
+	rc = rpcrdma_bc_setup_reps(r_xprt, reqs);
+	if (rc)
+		goto out_free;
+
+	rc = rpcrdma_ep_post_extra_recv(r_xprt, reqs);
+	if (rc)
+		goto out_free;
+
+	buffer->rb_bc_srv_max_requests = reqs;
+	request_module("svcrdma");
+
+	return 0;
+
+out_free:
+	xprt_rdma_bc_destroy(xprt, reqs);
+
+	pr_err("RPC:       %s: setup backchannel transport failed\n", __func__);
+	return -ENOMEM;
+}
+
+/**
+ * xprt_rdma_bc_destroy - Release resources for handling backchannel requests
+ * @xprt: transport associated with these backchannel resources
+ * @reqs: number of incoming requests to destroy; ignored
+ */
+void xprt_rdma_bc_destroy(struct rpc_xprt *xprt, unsigned int reqs)
+{
+	struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
+	struct rpc_rqst *rqst, *tmp;
+
+	spin_lock_bh(&xprt->bc_pa_lock);
+	list_for_each_entry_safe(rqst, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
+		list_del(&rqst->rq_bc_pa_list);
+		spin_unlock_bh(&xprt->bc_pa_lock);
+
+		rpcrdma_bc_free_rqst(r_xprt, rqst);
+
+		spin_lock_bh(&xprt->bc_pa_lock);
+	}
+	spin_unlock_bh(&xprt->bc_pa_lock);
+}
+
+/**
+ * xprt_rdma_bc_free_rqst - Release a backchannel rqst
+ * @rqst: request to release
+ */
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *rqst)
+{
+	struct rpc_xprt *xprt = rqst->rq_xprt;
+
+	smp_mb__before_atomic();
+	WARN_ON_ONCE(!test_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state));
+	clear_bit(RPC_BC_PA_IN_USE, &rqst->rq_bc_pa_state);
+	smp_mb__after_atomic();
+
+	spin_lock_bh(&xprt->bc_pa_lock);
+	list_add_tail(&rqst->rq_bc_pa_list, &xprt->bc_pa_list);
+	spin_unlock_bh(&xprt->bc_pa_lock);
+}
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index e9e5ed7..e3871a6 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -705,7 +705,12 @@  static struct rpc_xprt_ops xprt_rdma_procs = {
 	.print_stats		= xprt_rdma_print_stats,
 	.enable_swap		= xprt_rdma_enable_swap,
 	.disable_swap		= xprt_rdma_disable_swap,
-	.inject_disconnect	= xprt_rdma_inject_disconnect
+	.inject_disconnect	= xprt_rdma_inject_disconnect,
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+	.bc_setup		= xprt_rdma_bc_setup,
+	.bc_free_rqst		= xprt_rdma_bc_free_rqst,
+	.bc_destroy		= xprt_rdma_bc_destroy,
+#endif
 };
 
 static struct xprt_class xprt_rdma = {
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index 8d99214..1e4a948 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -877,7 +877,22 @@  retry:
 		}
 		rc = ep->rep_connected;
 	} else {
+		struct rpcrdma_xprt *r_xprt;
+		unsigned int extras;
+
 		dprintk("RPC:       %s: connected\n", __func__);
+
+		r_xprt = container_of(ia, struct rpcrdma_xprt, rx_ia);
+		extras = r_xprt->rx_buf.rb_bc_srv_max_requests;
+
+		if (extras) {
+			rc = rpcrdma_ep_post_extra_recv(r_xprt, extras);
+			if (rc)
+				pr_err("%s: could not post "
+				       "extra receive buffers: %i\n",
+				       __func__, rc);
+				rc = 0;
+		}
 	}
 
 out:
@@ -914,20 +929,25 @@  rpcrdma_ep_disconnect(struct rpcrdma_ep *ep, struct rpcrdma_ia *ia)
 	}
 }
 
-static struct rpcrdma_req *
+struct rpcrdma_req *
 rpcrdma_create_req(struct rpcrdma_xprt *r_xprt)
 {
+	struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
 	struct rpcrdma_req *req;
 
 	req = kzalloc(sizeof(*req), GFP_KERNEL);
 	if (req == NULL)
 		return ERR_PTR(-ENOMEM);
 
+	INIT_LIST_HEAD(&req->rl_free);
+	spin_lock(&buffer->rb_reqslock);
+	list_add(&req->rl_all, &buffer->rb_allreqs);
+	spin_unlock(&buffer->rb_reqslock);
 	req->rl_buffer = &r_xprt->rx_buf;
 	return req;
 }
 
-static struct rpcrdma_rep *
+struct rpcrdma_rep *
 rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 {
 	struct rpcrdma_create_data_internal *cdata = &r_xprt->rx_data;
@@ -965,6 +985,7 @@  rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 	int i, rc;
 
 	buf->rb_max_requests = r_xprt->rx_data.max_requests;
+	buf->rb_bc_srv_max_requests = 0;
 	spin_lock_init(&buf->rb_lock);
 
 	rc = ia->ri_ops->ro_init(r_xprt);
@@ -972,6 +993,8 @@  rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 		goto out;
 
 	INIT_LIST_HEAD(&buf->rb_send_bufs);
+	INIT_LIST_HEAD(&buf->rb_allreqs);
+	spin_lock_init(&buf->rb_reqslock);
 	for (i = 0; i < buf->rb_max_requests; i++) {
 		struct rpcrdma_req *req;
 
@@ -982,6 +1005,7 @@  rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
 			rc = PTR_ERR(req);
 			goto out;
 		}
+		req->rl_backchannel = false;
 		list_add(&req->rl_free, &buf->rb_send_bufs);
 	}
 
@@ -1008,19 +1032,13 @@  out:
 static void
 rpcrdma_destroy_rep(struct rpcrdma_ia *ia, struct rpcrdma_rep *rep)
 {
-	if (!rep)
-		return;
-
 	rpcrdma_free_regbuf(ia, rep->rr_rdmabuf);
 	kfree(rep);
 }
 
-static void
+void
 rpcrdma_destroy_req(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
 {
-	if (!req)
-		return;
-
 	rpcrdma_free_regbuf(ia, req->rl_sendbuf);
 	rpcrdma_free_regbuf(ia, req->rl_rdmabuf);
 	kfree(req);
@@ -1040,14 +1058,20 @@  rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
 		rpcrdma_destroy_rep(ia, rep);
 	}
 
-	while (!list_empty(&buf->rb_send_bufs)) {
-		struct rpcrdma_req *req = list_entry(buf->rb_send_bufs.next,
+	spin_lock(&buf->rb_reqslock);
+	while (!list_empty(&buf->rb_allreqs)) {
+		struct rpcrdma_req *req = list_entry(buf->rb_allreqs.next,
 						     struct rpcrdma_req,
-						     rl_free);
+						     rl_all);
+
+		list_del(&req->rl_all);
+		spin_unlock(&buf->rb_reqslock);
 
-		list_del(&req->rl_free);
 		rpcrdma_destroy_req(ia, req);
+
+		spin_lock(&buf->rb_reqslock);
 	}
+	spin_unlock(&buf->rb_reqslock);
 
 	ia->ri_ops->ro_destroy(buf);
 }
@@ -1094,7 +1118,7 @@  rpcrdma_buffer_get_locked(struct rpcrdma_buffer *buf)
 
 	rep = list_first_entry(&buf->rb_recv_bufs,
 			       struct rpcrdma_rep, rr_list);
-	list_del(&rep->rr_list);
+	list_del_init(&rep->rr_list);
 
 	return rep;
 }
@@ -1337,6 +1361,46 @@  rpcrdma_ep_post_recv(struct rpcrdma_ia *ia,
 	return rc;
 }
 
+/**
+ * rpcrdma_bc_post_recv - Post buffers to catch incoming backchannel requests
+ * @r_xprt: transport associated with these backchannel resources
+ * @min_reqs: minimum number of incoming requests expected
+ *
+ * Returns zero if all requested buffers were posted, or a negative errno.
+ */
+int
+rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *r_xprt, unsigned int count)
+{
+	struct rpcrdma_ia *ia = &r_xprt->rx_ia;
+	struct rpcrdma_ep *ep = &r_xprt->rx_ep;
+	struct rpcrdma_buffer *buffers = &r_xprt->rx_buf;
+	struct rpcrdma_rep *rep;
+	unsigned long flags;
+	int rc;
+
+	while (count--) {
+		rep = NULL;
+		spin_lock_irqsave(&buffers->rb_lock, flags);
+		if (!list_empty(&buffers->rb_recv_bufs))
+			rep = rpcrdma_buffer_get_locked(buffers);
+		spin_unlock_irqrestore(&buffers->rb_lock, flags);
+		if (!rep) {
+			pr_err("%s: no extra receive buffers\n", __func__);
+			return -ENOMEM;
+		}
+
+		rc = rpcrdma_ep_post_recv(ia, ep, rep);
+		if (rc) {
+			spin_lock_irqsave(&buffers->rb_lock, flags);
+			rpcrdma_buffer_put_locked(rep, buffers);
+			spin_unlock_irqrestore(&buffers->rb_lock, flags);
+			return rc;
+		}
+	}
+
+	return 0;
+}
+
 /* How many chunk list items fit within our inline buffers?
  */
 unsigned int
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index e6a358f..2ca0567 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -262,6 +262,9 @@  struct rpcrdma_req {
 	struct rpcrdma_regbuf	*rl_rdmabuf;
 	struct rpcrdma_regbuf	*rl_sendbuf;
 	struct rpcrdma_mr_seg	rl_segments[RPCRDMA_MAX_SEGS];
+
+	struct list_head	rl_all;
+	bool			rl_backchannel;
 };
 
 static inline struct rpcrdma_req *
@@ -290,6 +293,10 @@  struct rpcrdma_buffer {
 	struct list_head	rb_send_bufs;
 	struct list_head	rb_recv_bufs;
 	u32			rb_max_requests;
+
+	u32			rb_bc_srv_max_requests;
+	spinlock_t		rb_reqslock;	/* protect rb_allreqs */
+	struct list_head	rb_allreqs;
 };
 #define rdmab_to_ia(b) (&container_of((b), struct rpcrdma_xprt, rx_buf)->rx_ia)
 
@@ -410,6 +417,9 @@  int rpcrdma_ep_post_recv(struct rpcrdma_ia *, struct rpcrdma_ep *,
 /*
  * Buffer calls - xprtrdma/verbs.c
  */
+struct rpcrdma_req *rpcrdma_create_req(struct rpcrdma_xprt *);
+struct rpcrdma_rep *rpcrdma_create_rep(struct rpcrdma_xprt *);
+void rpcrdma_destroy_req(struct rpcrdma_ia *, struct rpcrdma_req *);
 int rpcrdma_buffer_create(struct rpcrdma_xprt *);
 void rpcrdma_buffer_destroy(struct rpcrdma_buffer *);
 
@@ -426,6 +436,7 @@  void rpcrdma_free_regbuf(struct rpcrdma_ia *,
 			 struct rpcrdma_regbuf *);
 
 unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
+int rpcrdma_ep_post_extra_recv(struct rpcrdma_xprt *, unsigned int);
 
 int frwr_alloc_recovery_wq(void);
 void frwr_destroy_recovery_wq(void);
@@ -490,6 +501,15 @@  int rpcrdma_marshal_req(struct rpc_rqst *);
 int xprt_rdma_init(void);
 void xprt_rdma_cleanup(void);
 
+/* Backchannel calls - xprtrdma/backchannel.c
+ */
+#if defined(CONFIG_SUNRPC_BACKCHANNEL)
+int xprt_rdma_bc_setup(struct rpc_xprt *, unsigned int);
+int rpcrdma_bc_post_recv(struct rpcrdma_xprt *, unsigned int);
+void xprt_rdma_bc_free_rqst(struct rpc_rqst *);
+void xprt_rdma_bc_destroy(struct rpc_xprt *, unsigned int);
+#endif	/* CONFIG_SUNRPC_BACKCHANNEL */
+
 /* Temporary NFS request map cache. Created in svc_rdma.c  */
 extern struct kmem_cache *svc_rdma_map_cachep;
 /* WR context cache. Created in svc_rdma.c  */