diff mbox series

[v8,for-next] RDMA/rxe: Add workqueue support for tasks

Message ID 20230428171321.5774-1-rpearsonhpe@gmail.com (mailing list archive)
State Accepted
Delegated to: Jason Gunthorpe
Headers show
Series [v8,for-next] RDMA/rxe: Add workqueue support for tasks | expand

Commit Message

Bob Pearson April 28, 2023, 5:13 p.m. UTC
Replace tasklets by work queues for the three main rxe tasklets:
rxe_requester, rxe_completer and rxe_responder.

Rebased to current for-next branch with changes, below, applied.

Link: https://lore.kernel.org/linux-rdma/20230329193308.7489-1-rpearsonhpe@gmail.com/
Signed-off-by: Ian Ziemba <ian.ziemba@hpe.com>
Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
v8:
  Corrected a soft cpu lockup by testing return value from task->func
  for all task states.
  Removed WQ_CPU_INTENSIVE flag from alloc_workqueue() since documentation
  shows that this has no effect if WQ_UNBOUND is set.
  Removed work_pending() call in __reserve_if_idle() since by design
  a task cannot be pending and idle at the same time.
  Renamed __do_task() to do_work() per a comment by Diasuke Matsuda.
v7:
  Adjusted so patch applies after changes to rxe_task.c.
v6:
  Fixed left over references to tasklets in the comments.
  Added WQ_UNBOUND to the parameters for alloc_workqueue(). This shows
  a significant performance improvement.
v5:
  Based on corrected task logic for tasklets and simplified to only
  convert from tasklets to workqueues and not provide a flexible
  interface.
---
 drivers/infiniband/sw/rxe/rxe.c      |   9 ++-
 drivers/infiniband/sw/rxe/rxe_task.c | 108 +++++++++++++++------------
 drivers/infiniband/sw/rxe/rxe_task.h |   6 +-
 3 files changed, 75 insertions(+), 48 deletions(-)


base-commit: 531094dc7164718d28ebb581d729807d7e846363

Comments

Daisuke Matsuda (Fujitsu) May 8, 2023, 8:18 a.m. UTC | #1
On Sat, April 29, 2023 2:13 AM Bob Pearson wrote:
> 
> Replace tasklets by work queues for the three main rxe tasklets:
> rxe_requester, rxe_completer and rxe_responder.
> 
> Rebased to current for-next branch with changes, below, applied.
> 
> Link: https://lore.kernel.org/linux-rdma/20230329193308.7489-1-rpearsonhpe@gmail.com/
> Signed-off-by: Ian Ziemba <ian.ziemba@hpe.com>
> Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>

Sorry for being late. I've been away on national holidays.
The change looks good.

Reviewed-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
Tested-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>

BTW, I have something to discuss, which is not directly related
to the inclusion of this patch. IMO, it should go into the tree.

Regardless of this patch, rxe responder currently *never* defer
any tasks other than RC_RDMA_READ_REQUEST ones. This means, under
heavy load, responder can keep on holding a CPU for a long time,
throttling other softirqs/tasklets in the system. 

Here I show how it happens. When a request arrives on responder,
the task is scheduled if there is any task being processed.
=====
void rxe_resp_queue_pkt(struct rxe_qp *qp, struct sk_buff *skb)
<...>
        must_sched = (pkt->opcode == IB_OPCODE_RC_RDMA_READ_REQUEST) ||
                        (skb_queue_len(&qp->req_pkts) > 1); // ### tasks are being processed.

        if (must_sched)
                rxe_sched_task(&qp->resp.task);
        else
                rxe_run_task(&qp->resp.task);
}
=====
However, if there is any task being processed (i.e. state == TASK_STATE_BUSY),
it is never scheduled but instead processed in the existing context.
=====
void rxe_sched_task(struct rxe_task *task)
<...>
        spin_lock_irqsave(&task->lock, flags);
        if (__reserve_if_idle(task)) // ### this ALWAYS returns false 
                queue_work(rxe_wq, &task->work);
---
static bool __reserve_if_idle(struct rxe_task *task)
<...>
        if (task->state == TASK_STATE_IDLE) {
                rxe_get(task->qp);
                task->state = TASK_STATE_BUSY;
                task->num_sched++;
                return true;
        }

        if (task->state == TASK_STATE_BUSY)
                task->state = TASK_STATE_ARMED; // ### used in do_task()

        return false;
=====
This behavior is justifiable in that it improves performance. However, do_task() can
hold a CPU until maximum RXE_MAX_ITERATIONS (=1024) tasks are processed
at a time, and this can happen consecutively under heavy load. I would like to know
what you guys think about this.

Thanks,
Daisuke


> ---
> v8:
>   Corrected a soft cpu lockup by testing return value from task->func
>   for all task states.
>   Removed WQ_CPU_INTENSIVE flag from alloc_workqueue() since documentation
>   shows that this has no effect if WQ_UNBOUND is set.
>   Removed work_pending() call in __reserve_if_idle() since by design
>   a task cannot be pending and idle at the same time.
>   Renamed __do_task() to do_work() per a comment by Diasuke Matsuda.
> v7:
>   Adjusted so patch applies after changes to rxe_task.c.
> v6:
>   Fixed left over references to tasklets in the comments.
>   Added WQ_UNBOUND to the parameters for alloc_workqueue(). This shows
>   a significant performance improvement.
> v5:
>   Based on corrected task logic for tasklets and simplified to only
>   convert from tasklets to workqueues and not provide a flexible
>   interface.
> ---
>  drivers/infiniband/sw/rxe/rxe.c      |   9 ++-
>  drivers/infiniband/sw/rxe/rxe_task.c | 108 +++++++++++++++------------
>  drivers/infiniband/sw/rxe/rxe_task.h |   6 +-
>  3 files changed, 75 insertions(+), 48 deletions(-)
> 
> diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
> index 7a7e713de52d..54c723a6edda 100644
> --- a/drivers/infiniband/sw/rxe/rxe.c
> +++ b/drivers/infiniband/sw/rxe/rxe.c
> @@ -212,10 +212,16 @@ static int __init rxe_module_init(void)
>  {
>  	int err;
> 
> -	err = rxe_net_init();
> +	err = rxe_alloc_wq();
>  	if (err)
>  		return err;
> 
> +	err = rxe_net_init();
> +	if (err) {
> +		rxe_destroy_wq();
> +		return err;
> +	}
> +
>  	rdma_link_register(&rxe_link_ops);
>  	pr_info("loaded\n");
>  	return 0;
> @@ -226,6 +232,7 @@ static void __exit rxe_module_exit(void)
>  	rdma_link_unregister(&rxe_link_ops);
>  	ib_unregister_driver(RDMA_DRIVER_RXE);
>  	rxe_net_exit();
> +	rxe_destroy_wq();
> 
>  	pr_info("unloaded\n");
>  }
> diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
> index fb9a6bc8e620..e2c13d3d0e47 100644
> --- a/drivers/infiniband/sw/rxe/rxe_task.c
> +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> @@ -6,8 +6,24 @@
> 
>  #include "rxe.h"
> 
> +static struct workqueue_struct *rxe_wq;
> +
> +int rxe_alloc_wq(void)
> +{
> +	rxe_wq = alloc_workqueue("rxe_wq", WQ_UNBOUND, WQ_MAX_ACTIVE);
> +	if (!rxe_wq)
> +		return -ENOMEM;
> +
> +	return 0;
> +}
> +
> +void rxe_destroy_wq(void)
> +{
> +	destroy_workqueue(rxe_wq);
> +}
> +
>  /* Check if task is idle i.e. not running, not scheduled in
> - * tasklet queue and not draining. If so move to busy to
> + * work queue and not draining. If so move to busy to
>   * reserve a slot in do_task() by setting to busy and taking
>   * a qp reference to cover the gap from now until the task finishes.
>   * state will move out of busy if task returns a non zero value
> @@ -21,9 +37,6 @@ static bool __reserve_if_idle(struct rxe_task *task)
>  {
>  	WARN_ON(rxe_read(task->qp) <= 0);
> 
> -	if (task->tasklet.state & BIT(TASKLET_STATE_SCHED))
> -		return false;
> -
>  	if (task->state == TASK_STATE_IDLE) {
>  		rxe_get(task->qp);
>  		task->state = TASK_STATE_BUSY;
> @@ -38,7 +51,7 @@ static bool __reserve_if_idle(struct rxe_task *task)
>  }
> 
>  /* check if task is idle or drained and not currently
> - * scheduled in the tasklet queue. This routine is
> + * scheduled in the work queue. This routine is
>   * called by rxe_cleanup_task or rxe_disable_task to
>   * see if the queue is empty.
>   * Context: caller should hold task->lock.
> @@ -46,7 +59,7 @@ static bool __reserve_if_idle(struct rxe_task *task)
>   */
>  static bool __is_done(struct rxe_task *task)
>  {
> -	if (task->tasklet.state & BIT(TASKLET_STATE_SCHED))
> +	if (work_pending(&task->work))
>  		return false;
> 
>  	if (task->state == TASK_STATE_IDLE ||
> @@ -77,23 +90,23 @@ static bool is_done(struct rxe_task *task)
>   * schedules the task. They must call __reserve_if_idle to
>   * move the task to busy before calling or scheduling.
>   * The task can also be moved to drained or invalid
> - * by calls to rxe-cleanup_task or rxe_disable_task.
> + * by calls to rxe_cleanup_task or rxe_disable_task.
>   * In that case tasks which get here are not executed but
>   * just flushed. The tasks are designed to look to see if
> - * there is work to do and do part of it before returning
> + * there is work to do and then do part of it before returning
>   * here with a return value of zero until all the work
> - * has been consumed then it retuens a non-zero value.
> + * has been consumed then it returns a non-zero value.
>   * The number of times the task can be run is limited by
>   * max iterations so one task cannot hold the cpu forever.
> + * If the limit is hit and work remains the task is rescheduled.
>   */
> -static void do_task(struct tasklet_struct *t)
> +static void do_task(struct rxe_task *task)
>  {
> -	int cont;
> -	int ret;
> -	struct rxe_task *task = from_tasklet(task, t, tasklet);
>  	unsigned int iterations;
>  	unsigned long flags;
>  	int resched = 0;
> +	int cont;
> +	int ret;
> 
>  	WARN_ON(rxe_read(task->qp) <= 0);
> 
> @@ -115,25 +128,22 @@ static void do_task(struct tasklet_struct *t)
>  		} while (ret == 0 && iterations-- > 0);
> 
>  		spin_lock_irqsave(&task->lock, flags);
> +		/* we're not done yet but we ran out of iterations.
> +		 * yield the cpu and reschedule the task
> +		 */
> +		if (!ret) {
> +			task->state = TASK_STATE_IDLE;
> +			resched = 1;
> +			goto exit;
> +		}
> +
>  		switch (task->state) {
>  		case TASK_STATE_BUSY:
> -			if (ret) {
> -				task->state = TASK_STATE_IDLE;
> -			} else {
> -				/* This can happen if the client
> -				 * can add work faster than the
> -				 * tasklet can finish it.
> -				 * Reschedule the tasklet and exit
> -				 * the loop to give up the cpu
> -				 */
> -				task->state = TASK_STATE_IDLE;
> -				resched = 1;
> -			}
> +			task->state = TASK_STATE_IDLE;
>  			break;
> 
> -		/* someone tried to run the task since the last time we called
> -		 * func, so we will call one more time regardless of the
> -		 * return value
> +		/* someone tried to schedule the task while we
> +		 * were running, keep going
>  		 */
>  		case TASK_STATE_ARMED:
>  			task->state = TASK_STATE_BUSY;
> @@ -141,21 +151,23 @@ static void do_task(struct tasklet_struct *t)
>  			break;
> 
>  		case TASK_STATE_DRAINING:
> -			if (ret)
> -				task->state = TASK_STATE_DRAINED;
> -			else
> -				cont = 1;
> +			task->state = TASK_STATE_DRAINED;
>  			break;
> 
>  		default:
>  			WARN_ON(1);
> -			rxe_info_qp(task->qp, "unexpected task state = %d", task->state);
> +			rxe_dbg_qp(task->qp, "unexpected task state = %d",
> +				   task->state);
> +			task->state = TASK_STATE_IDLE;
>  		}
> 
> +exit:
>  		if (!cont) {
>  			task->num_done++;
>  			if (WARN_ON(task->num_done != task->num_sched))
> -				rxe_err_qp(task->qp, "%ld tasks scheduled, %ld tasks done",
> +				rxe_dbg_qp(task->qp,
> +					   "%ld tasks scheduled, "
> +					   "%ld tasks done",
>  					   task->num_sched, task->num_done);
>  		}
>  		spin_unlock_irqrestore(&task->lock, flags);
> @@ -169,6 +181,12 @@ static void do_task(struct tasklet_struct *t)
>  	rxe_put(task->qp);
>  }
> 
> +/* wrapper around do_task to fix argument for work queue */
> +static void do_work(struct work_struct *work)
> +{
> +	do_task(container_of(work, struct rxe_task, work));
> +}
> +
>  int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,
>  		  int (*func)(struct rxe_qp *))
>  {
> @@ -176,11 +194,9 @@ int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,
> 
>  	task->qp = qp;
>  	task->func = func;
> -
> -	tasklet_setup(&task->tasklet, do_task);
> -
>  	task->state = TASK_STATE_IDLE;
>  	spin_lock_init(&task->lock);
> +	INIT_WORK(&task->work, do_work);
> 
>  	return 0;
>  }
> @@ -213,8 +229,6 @@ void rxe_cleanup_task(struct rxe_task *task)
>  	while (!is_done(task))
>  		cond_resched();
> 
> -	tasklet_kill(&task->tasklet);
> -
>  	spin_lock_irqsave(&task->lock, flags);
>  	task->state = TASK_STATE_INVALID;
>  	spin_unlock_irqrestore(&task->lock, flags);
> @@ -226,7 +240,7 @@ void rxe_cleanup_task(struct rxe_task *task)
>  void rxe_run_task(struct rxe_task *task)
>  {
>  	unsigned long flags;
> -	int run;
> +	bool run;
> 
>  	WARN_ON(rxe_read(task->qp) <= 0);
> 
> @@ -235,11 +249,11 @@ void rxe_run_task(struct rxe_task *task)
>  	spin_unlock_irqrestore(&task->lock, flags);
> 
>  	if (run)
> -		do_task(&task->tasklet);
> +		do_task(task);
>  }
> 
> -/* schedule the task to run later as a tasklet.
> - * the tasklet)schedule call can be called holding
> +/* schedule the task to run later as a work queue entry.
> + * the queue_work call can be called holding
>   * the lock.
>   */
>  void rxe_sched_task(struct rxe_task *task)
> @@ -250,7 +264,7 @@ void rxe_sched_task(struct rxe_task *task)
> 
>  	spin_lock_irqsave(&task->lock, flags);
>  	if (__reserve_if_idle(task))
> -		tasklet_schedule(&task->tasklet);
> +		queue_work(rxe_wq, &task->work);
>  	spin_unlock_irqrestore(&task->lock, flags);
>  }
> 
> @@ -277,7 +291,9 @@ void rxe_disable_task(struct rxe_task *task)
>  	while (!is_done(task))
>  		cond_resched();
> 
> -	tasklet_disable(&task->tasklet);
> +	spin_lock_irqsave(&task->lock, flags);
> +	task->state = TASK_STATE_DRAINED;
> +	spin_unlock_irqrestore(&task->lock, flags);
>  }
> 
>  void rxe_enable_task(struct rxe_task *task)
> @@ -291,7 +307,7 @@ void rxe_enable_task(struct rxe_task *task)
>  		spin_unlock_irqrestore(&task->lock, flags);
>  		return;
>  	}
> +
>  	task->state = TASK_STATE_IDLE;
> -	tasklet_enable(&task->tasklet);
>  	spin_unlock_irqrestore(&task->lock, flags);
>  }
> diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
> index facb7c8e3729..a63e258b3d66 100644
> --- a/drivers/infiniband/sw/rxe/rxe_task.h
> +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> @@ -22,7 +22,7 @@ enum {
>   * called again.
>   */
>  struct rxe_task {
> -	struct tasklet_struct	tasklet;
> +	struct work_struct	work;
>  	int			state;
>  	spinlock_t		lock;
>  	struct rxe_qp		*qp;
> @@ -32,6 +32,10 @@ struct rxe_task {
>  	long			num_done;
>  };
> 
> +int rxe_alloc_wq(void);
> +
> +void rxe_destroy_wq(void);
> +
>  /*
>   * init rxe_task structure
>   *	qp  => parameter to pass to func
> 
> base-commit: 531094dc7164718d28ebb581d729807d7e846363
> --
> 2.37.2
Jason Gunthorpe May 17, 2023, 6:38 p.m. UTC | #2
On Fri, Apr 28, 2023 at 12:13:22PM -0500, Bob Pearson wrote:
> Replace tasklets by work queues for the three main rxe tasklets:
> rxe_requester, rxe_completer and rxe_responder.
> 
> Rebased to current for-next branch with changes, below, applied.
> 
> Link: https://lore.kernel.org/linux-rdma/20230329193308.7489-1-rpearsonhpe@gmail.com/
> Signed-off-by: Ian Ziemba <ian.ziemba@hpe.com>
> Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
> Reviewed-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
> Tested-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
> ---
> v8:
>   Corrected a soft cpu lockup by testing return value from task->func
>   for all task states.
>   Removed WQ_CPU_INTENSIVE flag from alloc_workqueue() since documentation
>   shows that this has no effect if WQ_UNBOUND is set.
>   Removed work_pending() call in __reserve_if_idle() since by design
>   a task cannot be pending and idle at the same time.
>   Renamed __do_task() to do_work() per a comment by Diasuke Matsuda.
> v7:
>   Adjusted so patch applies after changes to rxe_task.c.
> v6:
>   Fixed left over references to tasklets in the comments.
>   Added WQ_UNBOUND to the parameters for alloc_workqueue(). This shows
>   a significant performance improvement.
> v5:
>   Based on corrected task logic for tasklets and simplified to only
>   convert from tasklets to workqueues and not provide a flexible
>   interface.
> ---
>  drivers/infiniband/sw/rxe/rxe.c      |   9 ++-
>  drivers/infiniband/sw/rxe/rxe_task.c | 108 +++++++++++++++------------
>  drivers/infiniband/sw/rxe/rxe_task.h |   6 +-
>  3 files changed, 75 insertions(+), 48 deletions(-)

Applied to for-next, thanks

Jason
Bob Pearson May 17, 2023, 7:20 p.m. UTC | #3
On 5/17/23 13:38, Jason Gunthorpe wrote:
> On Fri, Apr 28, 2023 at 12:13:22PM -0500, Bob Pearson wrote:
>> Replace tasklets by work queues for the three main rxe tasklets:
>> rxe_requester, rxe_completer and rxe_responder.
>>
>> Rebased to current for-next branch with changes, below, applied.
>>
>> Link: https://lore.kernel.org/linux-rdma/20230329193308.7489-1-rpearsonhpe@gmail.com/
>> Signed-off-by: Ian Ziemba <ian.ziemba@hpe.com>
>> Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
>> Reviewed-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
>> Tested-by: Daisuke Matsuda <matsuda-daisuke@fujitsu.com>
>> ---
>> v8:
>>   Corrected a soft cpu lockup by testing return value from task->func
>>   for all task states.
>>   Removed WQ_CPU_INTENSIVE flag from alloc_workqueue() since documentation
>>   shows that this has no effect if WQ_UNBOUND is set.
>>   Removed work_pending() call in __reserve_if_idle() since by design
>>   a task cannot be pending and idle at the same time.
>>   Renamed __do_task() to do_work() per a comment by Diasuke Matsuda.
>> v7:
>>   Adjusted so patch applies after changes to rxe_task.c.
>> v6:
>>   Fixed left over references to tasklets in the comments.
>>   Added WQ_UNBOUND to the parameters for alloc_workqueue(). This shows
>>   a significant performance improvement.
>> v5:
>>   Based on corrected task logic for tasklets and simplified to only
>>   convert from tasklets to workqueues and not provide a flexible
>>   interface.
>> ---
>>  drivers/infiniband/sw/rxe/rxe.c      |   9 ++-
>>  drivers/infiniband/sw/rxe/rxe_task.c | 108 +++++++++++++++------------
>>  drivers/infiniband/sw/rxe/rxe_task.h |   6 +-
>>  3 files changed, 75 insertions(+), 48 deletions(-)
> 
> Applied to for-next, thanks
> 
> Jason
Thanks!!

Bob
diff mbox series

Patch

diff --git a/drivers/infiniband/sw/rxe/rxe.c b/drivers/infiniband/sw/rxe/rxe.c
index 7a7e713de52d..54c723a6edda 100644
--- a/drivers/infiniband/sw/rxe/rxe.c
+++ b/drivers/infiniband/sw/rxe/rxe.c
@@ -212,10 +212,16 @@  static int __init rxe_module_init(void)
 {
 	int err;
 
-	err = rxe_net_init();
+	err = rxe_alloc_wq();
 	if (err)
 		return err;
 
+	err = rxe_net_init();
+	if (err) {
+		rxe_destroy_wq();
+		return err;
+	}
+
 	rdma_link_register(&rxe_link_ops);
 	pr_info("loaded\n");
 	return 0;
@@ -226,6 +232,7 @@  static void __exit rxe_module_exit(void)
 	rdma_link_unregister(&rxe_link_ops);
 	ib_unregister_driver(RDMA_DRIVER_RXE);
 	rxe_net_exit();
+	rxe_destroy_wq();
 
 	pr_info("unloaded\n");
 }
diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
index fb9a6bc8e620..e2c13d3d0e47 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.c
+++ b/drivers/infiniband/sw/rxe/rxe_task.c
@@ -6,8 +6,24 @@ 
 
 #include "rxe.h"
 
+static struct workqueue_struct *rxe_wq;
+
+int rxe_alloc_wq(void)
+{
+	rxe_wq = alloc_workqueue("rxe_wq", WQ_UNBOUND, WQ_MAX_ACTIVE);
+	if (!rxe_wq)
+		return -ENOMEM;
+
+	return 0;
+}
+
+void rxe_destroy_wq(void)
+{
+	destroy_workqueue(rxe_wq);
+}
+
 /* Check if task is idle i.e. not running, not scheduled in
- * tasklet queue and not draining. If so move to busy to
+ * work queue and not draining. If so move to busy to
  * reserve a slot in do_task() by setting to busy and taking
  * a qp reference to cover the gap from now until the task finishes.
  * state will move out of busy if task returns a non zero value
@@ -21,9 +37,6 @@  static bool __reserve_if_idle(struct rxe_task *task)
 {
 	WARN_ON(rxe_read(task->qp) <= 0);
 
-	if (task->tasklet.state & BIT(TASKLET_STATE_SCHED))
-		return false;
-
 	if (task->state == TASK_STATE_IDLE) {
 		rxe_get(task->qp);
 		task->state = TASK_STATE_BUSY;
@@ -38,7 +51,7 @@  static bool __reserve_if_idle(struct rxe_task *task)
 }
 
 /* check if task is idle or drained and not currently
- * scheduled in the tasklet queue. This routine is
+ * scheduled in the work queue. This routine is
  * called by rxe_cleanup_task or rxe_disable_task to
  * see if the queue is empty.
  * Context: caller should hold task->lock.
@@ -46,7 +59,7 @@  static bool __reserve_if_idle(struct rxe_task *task)
  */
 static bool __is_done(struct rxe_task *task)
 {
-	if (task->tasklet.state & BIT(TASKLET_STATE_SCHED))
+	if (work_pending(&task->work))
 		return false;
 
 	if (task->state == TASK_STATE_IDLE ||
@@ -77,23 +90,23 @@  static bool is_done(struct rxe_task *task)
  * schedules the task. They must call __reserve_if_idle to
  * move the task to busy before calling or scheduling.
  * The task can also be moved to drained or invalid
- * by calls to rxe-cleanup_task or rxe_disable_task.
+ * by calls to rxe_cleanup_task or rxe_disable_task.
  * In that case tasks which get here are not executed but
  * just flushed. The tasks are designed to look to see if
- * there is work to do and do part of it before returning
+ * there is work to do and then do part of it before returning
  * here with a return value of zero until all the work
- * has been consumed then it retuens a non-zero value.
+ * has been consumed then it returns a non-zero value.
  * The number of times the task can be run is limited by
  * max iterations so one task cannot hold the cpu forever.
+ * If the limit is hit and work remains the task is rescheduled.
  */
-static void do_task(struct tasklet_struct *t)
+static void do_task(struct rxe_task *task)
 {
-	int cont;
-	int ret;
-	struct rxe_task *task = from_tasklet(task, t, tasklet);
 	unsigned int iterations;
 	unsigned long flags;
 	int resched = 0;
+	int cont;
+	int ret;
 
 	WARN_ON(rxe_read(task->qp) <= 0);
 
@@ -115,25 +128,22 @@  static void do_task(struct tasklet_struct *t)
 		} while (ret == 0 && iterations-- > 0);
 
 		spin_lock_irqsave(&task->lock, flags);
+		/* we're not done yet but we ran out of iterations.
+		 * yield the cpu and reschedule the task
+		 */
+		if (!ret) {
+			task->state = TASK_STATE_IDLE;
+			resched = 1;
+			goto exit;
+		}
+
 		switch (task->state) {
 		case TASK_STATE_BUSY:
-			if (ret) {
-				task->state = TASK_STATE_IDLE;
-			} else {
-				/* This can happen if the client
-				 * can add work faster than the
-				 * tasklet can finish it.
-				 * Reschedule the tasklet and exit
-				 * the loop to give up the cpu
-				 */
-				task->state = TASK_STATE_IDLE;
-				resched = 1;
-			}
+			task->state = TASK_STATE_IDLE;
 			break;
 
-		/* someone tried to run the task since the last time we called
-		 * func, so we will call one more time regardless of the
-		 * return value
+		/* someone tried to schedule the task while we
+		 * were running, keep going
 		 */
 		case TASK_STATE_ARMED:
 			task->state = TASK_STATE_BUSY;
@@ -141,21 +151,23 @@  static void do_task(struct tasklet_struct *t)
 			break;
 
 		case TASK_STATE_DRAINING:
-			if (ret)
-				task->state = TASK_STATE_DRAINED;
-			else
-				cont = 1;
+			task->state = TASK_STATE_DRAINED;
 			break;
 
 		default:
 			WARN_ON(1);
-			rxe_info_qp(task->qp, "unexpected task state = %d", task->state);
+			rxe_dbg_qp(task->qp, "unexpected task state = %d",
+				   task->state);
+			task->state = TASK_STATE_IDLE;
 		}
 
+exit:
 		if (!cont) {
 			task->num_done++;
 			if (WARN_ON(task->num_done != task->num_sched))
-				rxe_err_qp(task->qp, "%ld tasks scheduled, %ld tasks done",
+				rxe_dbg_qp(task->qp,
+					   "%ld tasks scheduled, "
+					   "%ld tasks done",
 					   task->num_sched, task->num_done);
 		}
 		spin_unlock_irqrestore(&task->lock, flags);
@@ -169,6 +181,12 @@  static void do_task(struct tasklet_struct *t)
 	rxe_put(task->qp);
 }
 
+/* wrapper around do_task to fix argument for work queue */
+static void do_work(struct work_struct *work)
+{
+	do_task(container_of(work, struct rxe_task, work));
+}
+
 int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,
 		  int (*func)(struct rxe_qp *))
 {
@@ -176,11 +194,9 @@  int rxe_init_task(struct rxe_task *task, struct rxe_qp *qp,
 
 	task->qp = qp;
 	task->func = func;
-
-	tasklet_setup(&task->tasklet, do_task);
-
 	task->state = TASK_STATE_IDLE;
 	spin_lock_init(&task->lock);
+	INIT_WORK(&task->work, do_work);
 
 	return 0;
 }
@@ -213,8 +229,6 @@  void rxe_cleanup_task(struct rxe_task *task)
 	while (!is_done(task))
 		cond_resched();
 
-	tasklet_kill(&task->tasklet);
-
 	spin_lock_irqsave(&task->lock, flags);
 	task->state = TASK_STATE_INVALID;
 	spin_unlock_irqrestore(&task->lock, flags);
@@ -226,7 +240,7 @@  void rxe_cleanup_task(struct rxe_task *task)
 void rxe_run_task(struct rxe_task *task)
 {
 	unsigned long flags;
-	int run;
+	bool run;
 
 	WARN_ON(rxe_read(task->qp) <= 0);
 
@@ -235,11 +249,11 @@  void rxe_run_task(struct rxe_task *task)
 	spin_unlock_irqrestore(&task->lock, flags);
 
 	if (run)
-		do_task(&task->tasklet);
+		do_task(task);
 }
 
-/* schedule the task to run later as a tasklet.
- * the tasklet)schedule call can be called holding
+/* schedule the task to run later as a work queue entry.
+ * the queue_work call can be called holding
  * the lock.
  */
 void rxe_sched_task(struct rxe_task *task)
@@ -250,7 +264,7 @@  void rxe_sched_task(struct rxe_task *task)
 
 	spin_lock_irqsave(&task->lock, flags);
 	if (__reserve_if_idle(task))
-		tasklet_schedule(&task->tasklet);
+		queue_work(rxe_wq, &task->work);
 	spin_unlock_irqrestore(&task->lock, flags);
 }
 
@@ -277,7 +291,9 @@  void rxe_disable_task(struct rxe_task *task)
 	while (!is_done(task))
 		cond_resched();
 
-	tasklet_disable(&task->tasklet);
+	spin_lock_irqsave(&task->lock, flags);
+	task->state = TASK_STATE_DRAINED;
+	spin_unlock_irqrestore(&task->lock, flags);
 }
 
 void rxe_enable_task(struct rxe_task *task)
@@ -291,7 +307,7 @@  void rxe_enable_task(struct rxe_task *task)
 		spin_unlock_irqrestore(&task->lock, flags);
 		return;
 	}
+
 	task->state = TASK_STATE_IDLE;
-	tasklet_enable(&task->tasklet);
 	spin_unlock_irqrestore(&task->lock, flags);
 }
diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
index facb7c8e3729..a63e258b3d66 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.h
+++ b/drivers/infiniband/sw/rxe/rxe_task.h
@@ -22,7 +22,7 @@  enum {
  * called again.
  */
 struct rxe_task {
-	struct tasklet_struct	tasklet;
+	struct work_struct	work;
 	int			state;
 	spinlock_t		lock;
 	struct rxe_qp		*qp;
@@ -32,6 +32,10 @@  struct rxe_task {
 	long			num_done;
 };
 
+int rxe_alloc_wq(void);
+
+void rxe_destroy_wq(void);
+
 /*
  * init rxe_task structure
  *	qp  => parameter to pass to func