diff mbox

[v2,06/16] xprtrdma: Use workqueue to process RPC/RDMA replies

Message ID 20151006145924.11788.64757.stgit@manet.1015granger.net (mailing list archive)
State Not Applicable
Headers show

Commit Message

Chuck Lever Oct. 6, 2015, 2:59 p.m. UTC
The reply tasklet is fast, but it's single threaded. After reply
traffic saturates a single CPU, there's no more reply processing
capacity.

Replace the tasklet with a workqueue to spread reply handling across
all CPUs.  This also moves RPC/RDMA reply handling out of the soft
IRQ context and into a context that allows sleeps.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
---
 net/sunrpc/xprtrdma/rpc_rdma.c  |   17 +++++++-----
 net/sunrpc/xprtrdma/transport.c |    8 ++++++
 net/sunrpc/xprtrdma/verbs.c     |   54 ++++++++++++++++++++++++++++++++-------
 net/sunrpc/xprtrdma/xprt_rdma.h |    4 +++
 4 files changed, 65 insertions(+), 18 deletions(-)


--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Comments

Devesh Sharma Oct. 6, 2015, 6:30 p.m. UTC | #1
Looks good! I will send a test report with ocrdma driver.

Reviewed-By: Devesh Sharma <devesh.sharma@avagotech.com>

On Tue, Oct 6, 2015 at 8:29 PM, Chuck Lever <chuck.lever@oracle.com> wrote:
> The reply tasklet is fast, but it's single threaded. After reply
> traffic saturates a single CPU, there's no more reply processing
> capacity.
>
> Replace the tasklet with a workqueue to spread reply handling across
> all CPUs.  This also moves RPC/RDMA reply handling out of the soft
> IRQ context and into a context that allows sleeps.
>
> Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
> ---
>  net/sunrpc/xprtrdma/rpc_rdma.c  |   17 +++++++-----
>  net/sunrpc/xprtrdma/transport.c |    8 ++++++
>  net/sunrpc/xprtrdma/verbs.c     |   54 ++++++++++++++++++++++++++++++++-------
>  net/sunrpc/xprtrdma/xprt_rdma.h |    4 +++
>  4 files changed, 65 insertions(+), 18 deletions(-)
>
> diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
> index 60ffa63..95774fc 100644
> --- a/net/sunrpc/xprtrdma/rpc_rdma.c
> +++ b/net/sunrpc/xprtrdma/rpc_rdma.c
> @@ -723,8 +723,8 @@ rpcrdma_conn_func(struct rpcrdma_ep *ep)
>         schedule_delayed_work(&ep->rep_connect_worker, 0);
>  }
>
> -/*
> - * Called as a tasklet to do req/reply match and complete a request
> +/* Process received RPC/RDMA messages.
> + *
>   * Errors must result in the RPC task either being awakened, or
>   * allowed to timeout, to discover the errors at that time.
>   */
> @@ -752,13 +752,14 @@ rpcrdma_reply_handler(struct rpcrdma_rep *rep)
>         if (headerp->rm_vers != rpcrdma_version)
>                 goto out_badversion;
>
> -       /* Get XID and try for a match. */
> -       spin_lock(&xprt->transport_lock);
> +       /* Match incoming rpcrdma_rep to an rpcrdma_req to
> +        * get context for handling any incoming chunks.
> +        */
> +       spin_lock_bh(&xprt->transport_lock);
>         rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
>         if (!rqst)
>                 goto out_nomatch;
>
> -       /* get request object */
>         req = rpcr_to_rdmar(rqst);
>         if (req->rl_reply)
>                 goto out_duplicate;
> @@ -859,7 +860,7 @@ badheader:
>                 xprt_release_rqst_cong(rqst->rq_task);
>
>         xprt_complete_rqst(rqst->rq_task, status);
> -       spin_unlock(&xprt->transport_lock);
> +       spin_unlock_bh(&xprt->transport_lock);
>         dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
>                         __func__, xprt, rqst, status);
>         return;
> @@ -882,14 +883,14 @@ out_badversion:
>         goto repost;
>
>  out_nomatch:
> -       spin_unlock(&xprt->transport_lock);
> +       spin_unlock_bh(&xprt->transport_lock);
>         dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
>                 __func__, be32_to_cpu(headerp->rm_xid),
>                 rep->rr_len);
>         goto repost;
>
>  out_duplicate:
> -       spin_unlock(&xprt->transport_lock);
> +       spin_unlock_bh(&xprt->transport_lock);
>         dprintk("RPC:       %s: "
>                 "duplicate reply %p to RPC request %p: xid 0x%08x\n",
>                 __func__, rep, req, be32_to_cpu(headerp->rm_xid));
> diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
> index e9e5ed7..897a2f3 100644
> --- a/net/sunrpc/xprtrdma/transport.c
> +++ b/net/sunrpc/xprtrdma/transport.c
> @@ -732,6 +732,7 @@ void xprt_rdma_cleanup(void)
>                 dprintk("RPC:       %s: xprt_unregister returned %i\n",
>                         __func__, rc);
>
> +       rpcrdma_destroy_wq();
>         frwr_destroy_recovery_wq();
>  }
>
> @@ -743,8 +744,15 @@ int xprt_rdma_init(void)
>         if (rc)
>                 return rc;
>
> +       rc = rpcrdma_alloc_wq();
> +       if (rc) {
> +               frwr_destroy_recovery_wq();
> +               return rc;
> +       }
> +
>         rc = xprt_register_transport(&xprt_rdma);
>         if (rc) {
> +               rpcrdma_destroy_wq();
>                 frwr_destroy_recovery_wq();
>                 return rc;
>         }
> diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
> index ab26392..cf2f5b3 100644
> --- a/net/sunrpc/xprtrdma/verbs.c
> +++ b/net/sunrpc/xprtrdma/verbs.c
> @@ -100,6 +100,35 @@ rpcrdma_run_tasklet(unsigned long data)
>
>  static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
>
> +static struct workqueue_struct *rpcrdma_receive_wq;
> +
> +int
> +rpcrdma_alloc_wq(void)
> +{
> +       struct workqueue_struct *recv_wq;
> +
> +       recv_wq = alloc_workqueue("xprtrdma_receive",
> +                                 WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
> +                                 0);
> +       if (!recv_wq)
> +               return -ENOMEM;
> +
> +       rpcrdma_receive_wq = recv_wq;
> +       return 0;
> +}
> +
> +void
> +rpcrdma_destroy_wq(void)
> +{
> +       struct workqueue_struct *wq;
> +
> +       if (rpcrdma_receive_wq) {
> +               wq = rpcrdma_receive_wq;
> +               rpcrdma_receive_wq = NULL;
> +               destroy_workqueue(wq);
> +       }
> +}
> +
>  static void
>  rpcrdma_schedule_tasklet(struct list_head *sched_list)
>  {
> @@ -196,7 +225,16 @@ rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
>  }
>
>  static void
> -rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
> +rpcrdma_receive_worker(struct work_struct *work)
> +{
> +       struct rpcrdma_rep *rep =
> +                       container_of(work, struct rpcrdma_rep, rr_work);
> +
> +       rpcrdma_reply_handler(rep);
> +}
> +
> +static void
> +rpcrdma_recvcq_process_wc(struct ib_wc *wc)
>  {
>         struct rpcrdma_rep *rep =
>                         (struct rpcrdma_rep *)(unsigned long)wc->wr_id;
> @@ -219,8 +257,9 @@ rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
>         prefetch(rdmab_to_msg(rep->rr_rdmabuf));
>
>  out_schedule:
> -       list_add_tail(&rep->rr_list, sched_list);
> +       queue_work(rpcrdma_receive_wq, &rep->rr_work);
>         return;
> +
>  out_fail:
>         if (wc->status != IB_WC_WR_FLUSH_ERR)
>                 pr_err("RPC:       %s: rep %p: %s\n",
> @@ -239,7 +278,6 @@ static void
>  rpcrdma_recvcq_poll(struct ib_cq *cq)
>  {
>         struct ib_wc *pos, wcs[4];
> -       LIST_HEAD(sched_list);
>         int count, rc;
>
>         do {
> @@ -251,10 +289,8 @@ rpcrdma_recvcq_poll(struct ib_cq *cq)
>
>                 count = rc;
>                 while (count-- > 0)
> -                       rpcrdma_recvcq_process_wc(pos++, &sched_list);
> +                       rpcrdma_recvcq_process_wc(pos++);
>         } while (rc == ARRAY_SIZE(wcs));
> -
> -       rpcrdma_schedule_tasklet(&sched_list);
>  }
>
>  /* Handle provider receive completion upcalls.
> @@ -272,12 +308,9 @@ static void
>  rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
>  {
>         struct ib_wc wc;
> -       LIST_HEAD(sched_list);
>
>         while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
> -               rpcrdma_recvcq_process_wc(&wc, &sched_list);
> -       if (!list_empty(&sched_list))
> -               rpcrdma_schedule_tasklet(&sched_list);
> +               rpcrdma_recvcq_process_wc(&wc);
>         while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
>                 rpcrdma_sendcq_process_wc(&wc);
>  }
> @@ -915,6 +948,7 @@ rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
>
>         rep->rr_device = ia->ri_device;
>         rep->rr_rxprt = r_xprt;
> +       INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
>         return rep;
>
>  out_free:
> diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
> index e6a358f..6ea1dbe 100644
> --- a/net/sunrpc/xprtrdma/xprt_rdma.h
> +++ b/net/sunrpc/xprtrdma/xprt_rdma.h
> @@ -164,6 +164,7 @@ struct rpcrdma_rep {
>         unsigned int            rr_len;
>         struct ib_device        *rr_device;
>         struct rpcrdma_xprt     *rr_rxprt;
> +       struct work_struct      rr_work;
>         struct list_head        rr_list;
>         struct rpcrdma_regbuf   *rr_rdmabuf;
>  };
> @@ -430,6 +431,9 @@ unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
>  int frwr_alloc_recovery_wq(void);
>  void frwr_destroy_recovery_wq(void);
>
> +int rpcrdma_alloc_wq(void);
> +void rpcrdma_destroy_wq(void);
> +
>  /*
>   * Wrappers for chunk registration, shared by read/write chunk code.
>   */
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Sagi Grimberg Oct. 7, 2015, 2:39 p.m. UTC | #2
On 10/6/2015 5:59 PM, Chuck Lever wrote:
> The reply tasklet is fast, but it's single threaded. After reply
> traffic saturates a single CPU, there's no more reply processing
> capacity.
>
> Replace the tasklet with a workqueue to spread reply handling across
> all CPUs.  This also moves RPC/RDMA reply handling out of the soft
> IRQ context and into a context that allows sleeps.

Hi Chuck,

I'm probably missing something here, but do you ever schedule in
the workqueue context? Don't you need to explicitly schedule after
a jiffie or so the code works also in a non fully preemptable kernel?
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Chuck Lever Oct. 7, 2015, 2:48 p.m. UTC | #3
> On Oct 7, 2015, at 10:39 AM, Sagi Grimberg <sagig@dev.mellanox.co.il> wrote:
> 
> On 10/6/2015 5:59 PM, Chuck Lever wrote:
>> The reply tasklet is fast, but it's single threaded. After reply
>> traffic saturates a single CPU, there's no more reply processing
>> capacity.
>> 
>> Replace the tasklet with a workqueue to spread reply handling across
>> all CPUs.  This also moves RPC/RDMA reply handling out of the soft
>> IRQ context and into a context that allows sleeps.
> 
> Hi Chuck,
> 
> I'm probably missing something here, but do you ever schedule in
> the workqueue context? Don't you need to explicitly schedule after
> a jiffie or so the code works also in a non fully preemptable kernel?

Each RPC reply gets its own work request. This is unlike
the tasklet, which continues to run as long as there are
items on xprtrdma’s global tasklet queue.

I can’t think of anything in the current reply handler
that would take longer than a few microseconds to run,
unless there is lock contention on the transport_lock.

wake_up_bit can also be slow sometimes, but it schedules
internally.

Eventually the reply handler will also synchronously
perform R_key invalidation. In that case, I think there
will be an implicit schedule while waiting for the
invalidation to finish.


-—
Chuck Lever



--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Sagi Grimberg Oct. 7, 2015, 3:17 p.m. UTC | #4
On 10/7/2015 5:48 PM, Chuck Lever wrote:
>
>> On Oct 7, 2015, at 10:39 AM, Sagi Grimberg <sagig@dev.mellanox.co.il> wrote:
>>
>> On 10/6/2015 5:59 PM, Chuck Lever wrote:
>>> The reply tasklet is fast, but it's single threaded. After reply
>>> traffic saturates a single CPU, there's no more reply processing
>>> capacity.
>>>
>>> Replace the tasklet with a workqueue to spread reply handling across
>>> all CPUs.  This also moves RPC/RDMA reply handling out of the soft
>>> IRQ context and into a context that allows sleeps.
>>
>> Hi Chuck,
>>
>> I'm probably missing something here, but do you ever schedule in
>> the workqueue context? Don't you need to explicitly schedule after
>> a jiffie or so the code works also in a non fully preemptable kernel?
>
> Each RPC reply gets its own work request. This is unlike
> the tasklet, which continues to run as long as there are
> items on xprtrdma’s global tasklet queue.

OK I understand now.

So this and the rest of the series looks good to me.

Reviewed-by: Sagi Grimberg <sagig@mellanox.com>
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/net/sunrpc/xprtrdma/rpc_rdma.c b/net/sunrpc/xprtrdma/rpc_rdma.c
index 60ffa63..95774fc 100644
--- a/net/sunrpc/xprtrdma/rpc_rdma.c
+++ b/net/sunrpc/xprtrdma/rpc_rdma.c
@@ -723,8 +723,8 @@  rpcrdma_conn_func(struct rpcrdma_ep *ep)
 	schedule_delayed_work(&ep->rep_connect_worker, 0);
 }
 
-/*
- * Called as a tasklet to do req/reply match and complete a request
+/* Process received RPC/RDMA messages.
+ *
  * Errors must result in the RPC task either being awakened, or
  * allowed to timeout, to discover the errors at that time.
  */
@@ -752,13 +752,14 @@  rpcrdma_reply_handler(struct rpcrdma_rep *rep)
 	if (headerp->rm_vers != rpcrdma_version)
 		goto out_badversion;
 
-	/* Get XID and try for a match. */
-	spin_lock(&xprt->transport_lock);
+	/* Match incoming rpcrdma_rep to an rpcrdma_req to
+	 * get context for handling any incoming chunks.
+	 */
+	spin_lock_bh(&xprt->transport_lock);
 	rqst = xprt_lookup_rqst(xprt, headerp->rm_xid);
 	if (!rqst)
 		goto out_nomatch;
 
-	/* get request object */
 	req = rpcr_to_rdmar(rqst);
 	if (req->rl_reply)
 		goto out_duplicate;
@@ -859,7 +860,7 @@  badheader:
 		xprt_release_rqst_cong(rqst->rq_task);
 
 	xprt_complete_rqst(rqst->rq_task, status);
-	spin_unlock(&xprt->transport_lock);
+	spin_unlock_bh(&xprt->transport_lock);
 	dprintk("RPC:       %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
 			__func__, xprt, rqst, status);
 	return;
@@ -882,14 +883,14 @@  out_badversion:
 	goto repost;
 
 out_nomatch:
-	spin_unlock(&xprt->transport_lock);
+	spin_unlock_bh(&xprt->transport_lock);
 	dprintk("RPC:       %s: no match for incoming xid 0x%08x len %d\n",
 		__func__, be32_to_cpu(headerp->rm_xid),
 		rep->rr_len);
 	goto repost;
 
 out_duplicate:
-	spin_unlock(&xprt->transport_lock);
+	spin_unlock_bh(&xprt->transport_lock);
 	dprintk("RPC:       %s: "
 		"duplicate reply %p to RPC request %p: xid 0x%08x\n",
 		__func__, rep, req, be32_to_cpu(headerp->rm_xid));
diff --git a/net/sunrpc/xprtrdma/transport.c b/net/sunrpc/xprtrdma/transport.c
index e9e5ed7..897a2f3 100644
--- a/net/sunrpc/xprtrdma/transport.c
+++ b/net/sunrpc/xprtrdma/transport.c
@@ -732,6 +732,7 @@  void xprt_rdma_cleanup(void)
 		dprintk("RPC:       %s: xprt_unregister returned %i\n",
 			__func__, rc);
 
+	rpcrdma_destroy_wq();
 	frwr_destroy_recovery_wq();
 }
 
@@ -743,8 +744,15 @@  int xprt_rdma_init(void)
 	if (rc)
 		return rc;
 
+	rc = rpcrdma_alloc_wq();
+	if (rc) {
+		frwr_destroy_recovery_wq();
+		return rc;
+	}
+
 	rc = xprt_register_transport(&xprt_rdma);
 	if (rc) {
+		rpcrdma_destroy_wq();
 		frwr_destroy_recovery_wq();
 		return rc;
 	}
diff --git a/net/sunrpc/xprtrdma/verbs.c b/net/sunrpc/xprtrdma/verbs.c
index ab26392..cf2f5b3 100644
--- a/net/sunrpc/xprtrdma/verbs.c
+++ b/net/sunrpc/xprtrdma/verbs.c
@@ -100,6 +100,35 @@  rpcrdma_run_tasklet(unsigned long data)
 
 static DECLARE_TASKLET(rpcrdma_tasklet_g, rpcrdma_run_tasklet, 0UL);
 
+static struct workqueue_struct *rpcrdma_receive_wq;
+
+int
+rpcrdma_alloc_wq(void)
+{
+	struct workqueue_struct *recv_wq;
+
+	recv_wq = alloc_workqueue("xprtrdma_receive",
+				  WQ_MEM_RECLAIM | WQ_UNBOUND | WQ_HIGHPRI,
+				  0);
+	if (!recv_wq)
+		return -ENOMEM;
+
+	rpcrdma_receive_wq = recv_wq;
+	return 0;
+}
+
+void
+rpcrdma_destroy_wq(void)
+{
+	struct workqueue_struct *wq;
+
+	if (rpcrdma_receive_wq) {
+		wq = rpcrdma_receive_wq;
+		rpcrdma_receive_wq = NULL;
+		destroy_workqueue(wq);
+	}
+}
+
 static void
 rpcrdma_schedule_tasklet(struct list_head *sched_list)
 {
@@ -196,7 +225,16 @@  rpcrdma_sendcq_upcall(struct ib_cq *cq, void *cq_context)
 }
 
 static void
-rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
+rpcrdma_receive_worker(struct work_struct *work)
+{
+	struct rpcrdma_rep *rep =
+			container_of(work, struct rpcrdma_rep, rr_work);
+
+	rpcrdma_reply_handler(rep);
+}
+
+static void
+rpcrdma_recvcq_process_wc(struct ib_wc *wc)
 {
 	struct rpcrdma_rep *rep =
 			(struct rpcrdma_rep *)(unsigned long)wc->wr_id;
@@ -219,8 +257,9 @@  rpcrdma_recvcq_process_wc(struct ib_wc *wc, struct list_head *sched_list)
 	prefetch(rdmab_to_msg(rep->rr_rdmabuf));
 
 out_schedule:
-	list_add_tail(&rep->rr_list, sched_list);
+	queue_work(rpcrdma_receive_wq, &rep->rr_work);
 	return;
+
 out_fail:
 	if (wc->status != IB_WC_WR_FLUSH_ERR)
 		pr_err("RPC:       %s: rep %p: %s\n",
@@ -239,7 +278,6 @@  static void
 rpcrdma_recvcq_poll(struct ib_cq *cq)
 {
 	struct ib_wc *pos, wcs[4];
-	LIST_HEAD(sched_list);
 	int count, rc;
 
 	do {
@@ -251,10 +289,8 @@  rpcrdma_recvcq_poll(struct ib_cq *cq)
 
 		count = rc;
 		while (count-- > 0)
-			rpcrdma_recvcq_process_wc(pos++, &sched_list);
+			rpcrdma_recvcq_process_wc(pos++);
 	} while (rc == ARRAY_SIZE(wcs));
-
-	rpcrdma_schedule_tasklet(&sched_list);
 }
 
 /* Handle provider receive completion upcalls.
@@ -272,12 +308,9 @@  static void
 rpcrdma_flush_cqs(struct rpcrdma_ep *ep)
 {
 	struct ib_wc wc;
-	LIST_HEAD(sched_list);
 
 	while (ib_poll_cq(ep->rep_attr.recv_cq, 1, &wc) > 0)
-		rpcrdma_recvcq_process_wc(&wc, &sched_list);
-	if (!list_empty(&sched_list))
-		rpcrdma_schedule_tasklet(&sched_list);
+		rpcrdma_recvcq_process_wc(&wc);
 	while (ib_poll_cq(ep->rep_attr.send_cq, 1, &wc) > 0)
 		rpcrdma_sendcq_process_wc(&wc);
 }
@@ -915,6 +948,7 @@  rpcrdma_create_rep(struct rpcrdma_xprt *r_xprt)
 
 	rep->rr_device = ia->ri_device;
 	rep->rr_rxprt = r_xprt;
+	INIT_WORK(&rep->rr_work, rpcrdma_receive_worker);
 	return rep;
 
 out_free:
diff --git a/net/sunrpc/xprtrdma/xprt_rdma.h b/net/sunrpc/xprtrdma/xprt_rdma.h
index e6a358f..6ea1dbe 100644
--- a/net/sunrpc/xprtrdma/xprt_rdma.h
+++ b/net/sunrpc/xprtrdma/xprt_rdma.h
@@ -164,6 +164,7 @@  struct rpcrdma_rep {
 	unsigned int		rr_len;
 	struct ib_device	*rr_device;
 	struct rpcrdma_xprt	*rr_rxprt;
+	struct work_struct	rr_work;
 	struct list_head	rr_list;
 	struct rpcrdma_regbuf	*rr_rdmabuf;
 };
@@ -430,6 +431,9 @@  unsigned int rpcrdma_max_segments(struct rpcrdma_xprt *);
 int frwr_alloc_recovery_wq(void);
 void frwr_destroy_recovery_wq(void);
 
+int rpcrdma_alloc_wq(void);
+void rpcrdma_destroy_wq(void);
+
 /*
  * Wrappers for chunk registration, shared by read/write chunk code.
  */