diff mbox

[v2] IB/rxe: assign QPs to cores for transmit scaling

Message ID 20180710023850.GA3096@attalasystems.com (mailing list archive)
State Rejected
Headers show

Commit Message

Vijay Immanuel July 10, 2018, 2:38 a.m. UTC
A QP must use the same NIC TX queue to maintain packet order. The TX
queue is usually selected based on the core from which the transmit
was originated.
Assign QPs to cores to better spread traffic across TX queues. This
requires scheduling the tasklets in the cpu assigned to the QP. The
transmit cpu is selected based on the source QPN.

Signed-off-by: Vijay Immanuel <vijayi@attalasystems.com>
---
Changes in v2:
  - Removed use of comp_vector for selecing the core.
  - Removed scheduling the req task for user QPs.

 drivers/infiniband/sw/rxe/rxe_comp.c |  8 +------
 drivers/infiniband/sw/rxe/rxe_qp.c   | 10 +++++---
 drivers/infiniband/sw/rxe/rxe_resp.c |  8 +------
 drivers/infiniband/sw/rxe/rxe_task.c | 45 +++++++++++++++++++++++++++++++++---
 drivers/infiniband/sw/rxe/rxe_task.h |  7 +++++-
 5 files changed, 57 insertions(+), 21 deletions(-)

Comments

Yuval Shaia July 16, 2018, 9:04 p.m. UTC | #1
On Mon, Jul 09, 2018 at 07:38:53PM -0700, Vijay Immanuel wrote:
> A QP must use the same NIC TX queue to maintain packet order. The TX
> queue is usually selected based on the core from which the transmit
> was originated.
> Assign QPs to cores to better spread traffic across TX queues. This
> requires scheduling the tasklets in the cpu assigned to the QP. The
> transmit cpu is selected based on the source QPN.

Can you share test results?

> 
> Signed-off-by: Vijay Immanuel <vijayi@attalasystems.com>
> ---
> Changes in v2:
>   - Removed use of comp_vector for selecing the core.
>   - Removed scheduling the req task for user QPs.
> 
>  drivers/infiniband/sw/rxe/rxe_comp.c |  8 +------
>  drivers/infiniband/sw/rxe/rxe_qp.c   | 10 +++++---
>  drivers/infiniband/sw/rxe/rxe_resp.c |  8 +------
>  drivers/infiniband/sw/rxe/rxe_task.c | 45 +++++++++++++++++++++++++++++++++---
>  drivers/infiniband/sw/rxe/rxe_task.h |  7 +++++-
>  5 files changed, 57 insertions(+), 21 deletions(-)
> 
> diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
> index 6cdc40e..189f80e 100644
> --- a/drivers/infiniband/sw/rxe/rxe_comp.c
> +++ b/drivers/infiniband/sw/rxe/rxe_comp.c
> @@ -149,14 +149,8 @@ void retransmit_timer(struct timer_list *t)
>  void rxe_comp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
>  			struct sk_buff *skb)
>  {
> -	int must_sched;
> -
>  	skb_queue_tail(&qp->resp_pkts, skb);
> -
> -	must_sched = skb_queue_len(&qp->resp_pkts) > 1;
> -	if (must_sched != 0)
> -		rxe_counter_inc(rxe, RXE_CNT_COMPLETER_SCHED);
> -	rxe_run_task(&qp->comp.task, must_sched);
> +	rxe_run_task(&qp->comp.task, 1);
>  }
>  
>  static inline enum comp_state get_wqe(struct rxe_qp *qp,
> diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
> index b9f7aa1..dc11227 100644
> --- a/drivers/infiniband/sw/rxe/rxe_qp.c
> +++ b/drivers/infiniband/sw/rxe/rxe_qp.c
> @@ -221,6 +221,7 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
>  {
>  	int err;
>  	int wqe_size;
> +	int qp_cpu;
>  
>  	err = sock_create_kern(&init_net, AF_INET, SOCK_DGRAM, 0, &qp->sk);
>  	if (err < 0)
> @@ -260,9 +261,10 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
>  	spin_lock_init(&qp->sq.sq_lock);
>  	skb_queue_head_init(&qp->req_pkts);
>  
> -	rxe_init_task(rxe, &qp->req.task, qp,
> +	qp_cpu = qp_num(qp) % num_online_cpus();
> +	rxe_init_task(rxe, &qp->req.task, qp_cpu, qp,
>  		      rxe_requester, "req");
> -	rxe_init_task(rxe, &qp->comp.task, qp,
> +	rxe_init_task(rxe, &qp->comp.task, qp_cpu, qp,
>  		      rxe_completer, "comp");
>  
>  	qp->qp_timeout_jiffies = 0; /* Can't be set for UD/UC in modify_qp */
> @@ -280,6 +282,7 @@ static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
>  {
>  	int err;
>  	int wqe_size;
> +	int qp_cpu;
>  
>  	if (!qp->srq) {
>  		qp->rq.max_wr		= init->cap.max_recv_wr;
> @@ -311,7 +314,8 @@ static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
>  
>  	skb_queue_head_init(&qp->resp_pkts);
>  
> -	rxe_init_task(rxe, &qp->resp.task, qp,
> +	qp_cpu = qp_num(qp) % num_online_cpus();
> +	rxe_init_task(rxe, &qp->resp.task, qp_cpu, qp,
>  		      rxe_responder, "resp");
>  
>  	qp->resp.opcode		= OPCODE_NONE;
> diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
> index 955ff3b..853ab49 100644
> --- a/drivers/infiniband/sw/rxe/rxe_resp.c
> +++ b/drivers/infiniband/sw/rxe/rxe_resp.c
> @@ -107,15 +107,9 @@ static char *resp_state_name[] = {
>  void rxe_resp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
>  			struct sk_buff *skb)
>  {
> -	int must_sched;
> -	struct rxe_pkt_info *pkt = SKB_TO_PKT(skb);
> -
>  	skb_queue_tail(&qp->req_pkts, skb);
>  
> -	must_sched = (pkt->opcode == IB_OPCODE_RC_RDMA_READ_REQUEST) ||
> -			(skb_queue_len(&qp->req_pkts) > 1);
> -
> -	rxe_run_task(&qp->resp.task, must_sched);
> +	rxe_run_task(&qp->resp.task, 1);
>  }
>  
>  static inline enum resp_states get_req(struct rxe_qp *qp,
> diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
> index 08f05ac..8b7cb97 100644
> --- a/drivers/infiniband/sw/rxe/rxe_task.c
> +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> @@ -37,6 +37,8 @@
>  
>  #include "rxe_task.h"
>  
> +static void rxe_run_task_local(void *data);
> +
>  int __rxe_do_task(struct rxe_task *task)
>  
>  {
> @@ -63,6 +65,7 @@ void rxe_do_task(unsigned long data)
>  	struct rxe_task *task = (struct rxe_task *)data;
>  
>  	spin_lock_irqsave(&task->state_lock, flags);
> +	task->scheduled = false;
>  	switch (task->state) {
>  	case TASK_STATE_START:
>  		task->state = TASK_STATE_BUSY;
> @@ -108,20 +111,27 @@ void rxe_do_task(unsigned long data)
>  			pr_warn("%s failed with bad state %d\n", __func__,
>  				task->state);
>  		}
> +
> +		if (!cont && task->scheduled)
> +			tasklet_schedule(&task->tasklet);
>  		spin_unlock_irqrestore(&task->state_lock, flags);
>  	} while (cont);
>  
>  	task->ret = ret;
>  }
>  
> -int rxe_init_task(void *obj, struct rxe_task *task,
> +int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
>  		  void *arg, int (*func)(void *), char *name)
>  {
> +	task->cpu	= cpu;
>  	task->obj	= obj;
>  	task->arg	= arg;
>  	task->func	= func;
> +	task->csd.func  = rxe_run_task_local;
> +	task->csd.info  = task;
>  	snprintf(task->name, sizeof(task->name), "%s", name);
>  	task->destroyed	= false;
> +	task->scheduled	= false;
>  
>  	tasklet_init(&task->tasklet, rxe_do_task, (unsigned long)task);
>  
> @@ -151,15 +161,44 @@ void rxe_cleanup_task(struct rxe_task *task)
>  	tasklet_kill(&task->tasklet);
>  }
>  
> +static void rxe_run_task_local(void *data)
> +{
> +	struct rxe_task *task = (struct rxe_task *)data;
> +
> +	if (task->destroyed)
> +		return;
> +
> +	tasklet_schedule(&task->tasklet);
> +}
> +
>  void rxe_run_task(struct rxe_task *task, int sched)
>  {
> +	int cpu;
> +	unsigned long flags;
> +
>  	if (task->destroyed)
>  		return;
>  
> -	if (sched)
> +	if (!sched) {
> +		rxe_do_task((unsigned long)task);
> +		return;
> +	}
> +
> +	spin_lock_irqsave(&task->state_lock, flags);
> +	if (task->scheduled || task->state != TASK_STATE_START) {
> +		task->scheduled = true;
> +		spin_unlock_irqrestore(&task->state_lock, flags);
> +		return;
> +	}
> +	task->scheduled = true;
> +	spin_unlock_irqrestore(&task->state_lock, flags);
> +
> +	cpu = get_cpu();
> +	if (task->cpu == cpu || !cpu_online(task->cpu))
>  		tasklet_schedule(&task->tasklet);
>  	else
> -		rxe_do_task((unsigned long)task);
> +		smp_call_function_single_async(task->cpu, &task->csd);
> +	put_cpu();
>  }
>  
>  void rxe_disable_task(struct rxe_task *task)
> diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
> index 08ff42d..1470dee 100644
> --- a/drivers/infiniband/sw/rxe/rxe_task.h
> +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> @@ -34,6 +34,8 @@
>  #ifndef RXE_TASK_H
>  #define RXE_TASK_H
>  
> +#include <linux/smp.h>
> +
>  enum {
>  	TASK_STATE_START	= 0,
>  	TASK_STATE_BUSY		= 1,
> @@ -48,13 +50,16 @@ enum {
>  struct rxe_task {
>  	void			*obj;
>  	struct tasklet_struct	tasklet;
> +	int			cpu;
>  	int			state;
>  	spinlock_t		state_lock; /* spinlock for task state */
>  	void			*arg;
>  	int			(*func)(void *arg);
> +	call_single_data_t	csd;
>  	int			ret;
>  	char			name[16];
>  	bool			destroyed;
> +	bool			scheduled;
>  };
>  
>  /*
> @@ -62,7 +67,7 @@ struct rxe_task {
>   *	arg  => parameter to pass to fcn
>   *	fcn  => function to call until it returns != 0
>   */
> -int rxe_init_task(void *obj, struct rxe_task *task,
> +int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
>  		  void *arg, int (*func)(void *), char *name);
>  
>  /* cleanup task */
> -- 
> 2.7.4
> 
> --
> To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> the body of a message to majordomo@vger.kernel.org
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Vijay Immanuel July 18, 2018, 8:51 a.m. UTC | #2
On Tue, Jul 17, 2018 at 12:04:47AM +0300, Yuval Shaia wrote:
> On Mon, Jul 09, 2018 at 07:38:53PM -0700, Vijay Immanuel wrote:
> > A QP must use the same NIC TX queue to maintain packet order. The TX
> > queue is usually selected based on the core from which the transmit
> > was originated.
> > Assign QPs to cores to better spread traffic across TX queues. This
> > requires scheduling the tasklets in the cpu assigned to the QP. The
> > transmit cpu is selected based on the source QPN.
> 
> Can you share test results?

I'm running NVMeoF host and target over RXE, single SSD, 40Gbps network, one switch.
IOPs (FIO 4K random reads, qd 64, jobs 4) - 80,000 (w/o patch), 141,000 (w/ patch). 
Latency (FIO 4K random reads, qd 1, jobs 1) - 173 us (w/o patch), 180 us (w/ patch). 
BW Sequential read and write were similar in either case, about 1,100 MB/s. 

> 
> > 
> > Signed-off-by: Vijay Immanuel <vijayi@attalasystems.com>
> > ---
> > Changes in v2:
> >   - Removed use of comp_vector for selecing the core.
> >   - Removed scheduling the req task for user QPs.
> > 
> >  drivers/infiniband/sw/rxe/rxe_comp.c |  8 +------
> >  drivers/infiniband/sw/rxe/rxe_qp.c   | 10 +++++---
> >  drivers/infiniband/sw/rxe/rxe_resp.c |  8 +------
> >  drivers/infiniband/sw/rxe/rxe_task.c | 45 +++++++++++++++++++++++++++++++++---
> >  drivers/infiniband/sw/rxe/rxe_task.h |  7 +++++-
> >  5 files changed, 57 insertions(+), 21 deletions(-)
> > 
> > diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
> > index 6cdc40e..189f80e 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_comp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_comp.c
> > @@ -149,14 +149,8 @@ void retransmit_timer(struct timer_list *t)
> >  void rxe_comp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
> >  			struct sk_buff *skb)
> >  {
> > -	int must_sched;
> > -
> >  	skb_queue_tail(&qp->resp_pkts, skb);
> > -
> > -	must_sched = skb_queue_len(&qp->resp_pkts) > 1;
> > -	if (must_sched != 0)
> > -		rxe_counter_inc(rxe, RXE_CNT_COMPLETER_SCHED);
> > -	rxe_run_task(&qp->comp.task, must_sched);
> > +	rxe_run_task(&qp->comp.task, 1);
> >  }
> >  
> >  static inline enum comp_state get_wqe(struct rxe_qp *qp,
> > diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
> > index b9f7aa1..dc11227 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_qp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_qp.c
> > @@ -221,6 +221,7 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
> >  {
> >  	int err;
> >  	int wqe_size;
> > +	int qp_cpu;
> >  
> >  	err = sock_create_kern(&init_net, AF_INET, SOCK_DGRAM, 0, &qp->sk);
> >  	if (err < 0)
> > @@ -260,9 +261,10 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
> >  	spin_lock_init(&qp->sq.sq_lock);
> >  	skb_queue_head_init(&qp->req_pkts);
> >  
> > -	rxe_init_task(rxe, &qp->req.task, qp,
> > +	qp_cpu = qp_num(qp) % num_online_cpus();
> > +	rxe_init_task(rxe, &qp->req.task, qp_cpu, qp,
> >  		      rxe_requester, "req");
> > -	rxe_init_task(rxe, &qp->comp.task, qp,
> > +	rxe_init_task(rxe, &qp->comp.task, qp_cpu, qp,
> >  		      rxe_completer, "comp");
> >  
> >  	qp->qp_timeout_jiffies = 0; /* Can't be set for UD/UC in modify_qp */
> > @@ -280,6 +282,7 @@ static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
> >  {
> >  	int err;
> >  	int wqe_size;
> > +	int qp_cpu;
> >  
> >  	if (!qp->srq) {
> >  		qp->rq.max_wr		= init->cap.max_recv_wr;
> > @@ -311,7 +314,8 @@ static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
> >  
> >  	skb_queue_head_init(&qp->resp_pkts);
> >  
> > -	rxe_init_task(rxe, &qp->resp.task, qp,
> > +	qp_cpu = qp_num(qp) % num_online_cpus();
> > +	rxe_init_task(rxe, &qp->resp.task, qp_cpu, qp,
> >  		      rxe_responder, "resp");
> >  
> >  	qp->resp.opcode		= OPCODE_NONE;
> > diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
> > index 955ff3b..853ab49 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_resp.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_resp.c
> > @@ -107,15 +107,9 @@ static char *resp_state_name[] = {
> >  void rxe_resp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
> >  			struct sk_buff *skb)
> >  {
> > -	int must_sched;
> > -	struct rxe_pkt_info *pkt = SKB_TO_PKT(skb);
> > -
> >  	skb_queue_tail(&qp->req_pkts, skb);
> >  
> > -	must_sched = (pkt->opcode == IB_OPCODE_RC_RDMA_READ_REQUEST) ||
> > -			(skb_queue_len(&qp->req_pkts) > 1);
> > -
> > -	rxe_run_task(&qp->resp.task, must_sched);
> > +	rxe_run_task(&qp->resp.task, 1);
> >  }
> >  
> >  static inline enum resp_states get_req(struct rxe_qp *qp,
> > diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
> > index 08f05ac..8b7cb97 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_task.c
> > +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> > @@ -37,6 +37,8 @@
> >  
> >  #include "rxe_task.h"
> >  
> > +static void rxe_run_task_local(void *data);
> > +
> >  int __rxe_do_task(struct rxe_task *task)
> >  
> >  {
> > @@ -63,6 +65,7 @@ void rxe_do_task(unsigned long data)
> >  	struct rxe_task *task = (struct rxe_task *)data;
> >  
> >  	spin_lock_irqsave(&task->state_lock, flags);
> > +	task->scheduled = false;
> >  	switch (task->state) {
> >  	case TASK_STATE_START:
> >  		task->state = TASK_STATE_BUSY;
> > @@ -108,20 +111,27 @@ void rxe_do_task(unsigned long data)
> >  			pr_warn("%s failed with bad state %d\n", __func__,
> >  				task->state);
> >  		}
> > +
> > +		if (!cont && task->scheduled)
> > +			tasklet_schedule(&task->tasklet);
> >  		spin_unlock_irqrestore(&task->state_lock, flags);
> >  	} while (cont);
> >  
> >  	task->ret = ret;
> >  }
> >  
> > -int rxe_init_task(void *obj, struct rxe_task *task,
> > +int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
> >  		  void *arg, int (*func)(void *), char *name)
> >  {
> > +	task->cpu	= cpu;
> >  	task->obj	= obj;
> >  	task->arg	= arg;
> >  	task->func	= func;
> > +	task->csd.func  = rxe_run_task_local;
> > +	task->csd.info  = task;
> >  	snprintf(task->name, sizeof(task->name), "%s", name);
> >  	task->destroyed	= false;
> > +	task->scheduled	= false;
> >  
> >  	tasklet_init(&task->tasklet, rxe_do_task, (unsigned long)task);
> >  
> > @@ -151,15 +161,44 @@ void rxe_cleanup_task(struct rxe_task *task)
> >  	tasklet_kill(&task->tasklet);
> >  }
> >  
> > +static void rxe_run_task_local(void *data)
> > +{
> > +	struct rxe_task *task = (struct rxe_task *)data;
> > +
> > +	if (task->destroyed)
> > +		return;
> > +
> > +	tasklet_schedule(&task->tasklet);
> > +}
> > +
> >  void rxe_run_task(struct rxe_task *task, int sched)
> >  {
> > +	int cpu;
> > +	unsigned long flags;
> > +
> >  	if (task->destroyed)
> >  		return;
> >  
> > -	if (sched)
> > +	if (!sched) {
> > +		rxe_do_task((unsigned long)task);
> > +		return;
> > +	}
> > +
> > +	spin_lock_irqsave(&task->state_lock, flags);
> > +	if (task->scheduled || task->state != TASK_STATE_START) {
> > +		task->scheduled = true;
> > +		spin_unlock_irqrestore(&task->state_lock, flags);
> > +		return;
> > +	}
> > +	task->scheduled = true;
> > +	spin_unlock_irqrestore(&task->state_lock, flags);
> > +
> > +	cpu = get_cpu();
> > +	if (task->cpu == cpu || !cpu_online(task->cpu))
> >  		tasklet_schedule(&task->tasklet);
> >  	else
> > -		rxe_do_task((unsigned long)task);
> > +		smp_call_function_single_async(task->cpu, &task->csd);
> > +	put_cpu();
> >  }
> >  
> >  void rxe_disable_task(struct rxe_task *task)
> > diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
> > index 08ff42d..1470dee 100644
> > --- a/drivers/infiniband/sw/rxe/rxe_task.h
> > +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> > @@ -34,6 +34,8 @@
> >  #ifndef RXE_TASK_H
> >  #define RXE_TASK_H
> >  
> > +#include <linux/smp.h>
> > +
> >  enum {
> >  	TASK_STATE_START	= 0,
> >  	TASK_STATE_BUSY		= 1,
> > @@ -48,13 +50,16 @@ enum {
> >  struct rxe_task {
> >  	void			*obj;
> >  	struct tasklet_struct	tasklet;
> > +	int			cpu;
> >  	int			state;
> >  	spinlock_t		state_lock; /* spinlock for task state */
> >  	void			*arg;
> >  	int			(*func)(void *arg);
> > +	call_single_data_t	csd;
> >  	int			ret;
> >  	char			name[16];
> >  	bool			destroyed;
> > +	bool			scheduled;
> >  };
> >  
> >  /*
> > @@ -62,7 +67,7 @@ struct rxe_task {
> >   *	arg  => parameter to pass to fcn
> >   *	fcn  => function to call until it returns != 0
> >   */
> > -int rxe_init_task(void *obj, struct rxe_task *task,
> > +int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
> >  		  void *arg, int (*func)(void *), char *name);
> >  
> >  /* cleanup task */
> > -- 
> > 2.7.4
> > 
> > --
> > To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> > the body of a message to majordomo@vger.kernel.org
> > More majordomo info at  http://vger.kernel.org/majordomo-info.html
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Yuval Shaia July 18, 2018, 1:16 p.m. UTC | #3
On Wed, Jul 18, 2018 at 01:51:20AM -0700, Vijay Immanuel wrote:
> On Tue, Jul 17, 2018 at 12:04:47AM +0300, Yuval Shaia wrote:
> > On Mon, Jul 09, 2018 at 07:38:53PM -0700, Vijay Immanuel wrote:
> > > A QP must use the same NIC TX queue to maintain packet order. The TX
> > > queue is usually selected based on the core from which the transmit
> > > was originated.
> > > Assign QPs to cores to better spread traffic across TX queues. This
> > > requires scheduling the tasklets in the cpu assigned to the QP. The
> > > transmit cpu is selected based on the source QPN.
> > 
> > Can you share test results?

Hi Vijay,

> 
> I'm running NVMeoF host and target over RXE, single SSD, 40Gbps network, one switch.
> IOPs (FIO 4K random reads, qd 64, jobs 4) - 80,000 (w/o patch), 141,000 (w/ patch). 
> Latency (FIO 4K random reads, qd 1, jobs 1) - 173 us (w/o patch), 180 us (w/ patch). 
> BW Sequential read and write were similar in either case, about 1,100 MB/s. 

I tried this patch with much more simpler setup - a QEMU VM with 4 CPUs and
4G RAM.
Unfortunately i was not happy with the results, maybe i'm using the wrong
tools for performance measurements but what i saw is the opposite, with
this patch numbers was worse than without.
I ran ibv_rc_pingpong where server and clients resides on the same VM with
4k and 1m message sizes and got this:

	w/o	w/
---------------------
4k:	~19	~31
1m:	~3847	~5647

Result is usec/iter.

Regardless of this, i briefly reviewed the patch and found no logical
mistakes in it but still wanted to put the things on the table.

Yuval

> 
> > 
> > > 
> > > Signed-off-by: Vijay Immanuel <vijayi@attalasystems.com>
> > > ---
> > > Changes in v2:
> > >   - Removed use of comp_vector for selecing the core.
> > >   - Removed scheduling the req task for user QPs.
> > > 
> > >  drivers/infiniband/sw/rxe/rxe_comp.c |  8 +------
> > >  drivers/infiniband/sw/rxe/rxe_qp.c   | 10 +++++---
> > >  drivers/infiniband/sw/rxe/rxe_resp.c |  8 +------
> > >  drivers/infiniband/sw/rxe/rxe_task.c | 45 +++++++++++++++++++++++++++++++++---
> > >  drivers/infiniband/sw/rxe/rxe_task.h |  7 +++++-
> > >  5 files changed, 57 insertions(+), 21 deletions(-)
> > > 
> > > diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
> > > index 6cdc40e..189f80e 100644
> > > --- a/drivers/infiniband/sw/rxe/rxe_comp.c
> > > +++ b/drivers/infiniband/sw/rxe/rxe_comp.c
> > > @@ -149,14 +149,8 @@ void retransmit_timer(struct timer_list *t)
> > >  void rxe_comp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
> > >  			struct sk_buff *skb)
> > >  {
> > > -	int must_sched;
> > > -
> > >  	skb_queue_tail(&qp->resp_pkts, skb);
> > > -
> > > -	must_sched = skb_queue_len(&qp->resp_pkts) > 1;
> > > -	if (must_sched != 0)
> > > -		rxe_counter_inc(rxe, RXE_CNT_COMPLETER_SCHED);
> > > -	rxe_run_task(&qp->comp.task, must_sched);
> > > +	rxe_run_task(&qp->comp.task, 1);
> > >  }
> > >  
> > >  static inline enum comp_state get_wqe(struct rxe_qp *qp,
> > > diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
> > > index b9f7aa1..dc11227 100644
> > > --- a/drivers/infiniband/sw/rxe/rxe_qp.c
> > > +++ b/drivers/infiniband/sw/rxe/rxe_qp.c
> > > @@ -221,6 +221,7 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
> > >  {
> > >  	int err;
> > >  	int wqe_size;
> > > +	int qp_cpu;
> > >  
> > >  	err = sock_create_kern(&init_net, AF_INET, SOCK_DGRAM, 0, &qp->sk);
> > >  	if (err < 0)
> > > @@ -260,9 +261,10 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
> > >  	spin_lock_init(&qp->sq.sq_lock);
> > >  	skb_queue_head_init(&qp->req_pkts);
> > >  
> > > -	rxe_init_task(rxe, &qp->req.task, qp,
> > > +	qp_cpu = qp_num(qp) % num_online_cpus();
> > > +	rxe_init_task(rxe, &qp->req.task, qp_cpu, qp,
> > >  		      rxe_requester, "req");
> > > -	rxe_init_task(rxe, &qp->comp.task, qp,
> > > +	rxe_init_task(rxe, &qp->comp.task, qp_cpu, qp,
> > >  		      rxe_completer, "comp");
> > >  
> > >  	qp->qp_timeout_jiffies = 0; /* Can't be set for UD/UC in modify_qp */
> > > @@ -280,6 +282,7 @@ static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
> > >  {
> > >  	int err;
> > >  	int wqe_size;
> > > +	int qp_cpu;
> > >  
> > >  	if (!qp->srq) {
> > >  		qp->rq.max_wr		= init->cap.max_recv_wr;
> > > @@ -311,7 +314,8 @@ static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
> > >  
> > >  	skb_queue_head_init(&qp->resp_pkts);
> > >  
> > > -	rxe_init_task(rxe, &qp->resp.task, qp,
> > > +	qp_cpu = qp_num(qp) % num_online_cpus();
> > > +	rxe_init_task(rxe, &qp->resp.task, qp_cpu, qp,
> > >  		      rxe_responder, "resp");
> > >  
> > >  	qp->resp.opcode		= OPCODE_NONE;
> > > diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
> > > index 955ff3b..853ab49 100644
> > > --- a/drivers/infiniband/sw/rxe/rxe_resp.c
> > > +++ b/drivers/infiniband/sw/rxe/rxe_resp.c
> > > @@ -107,15 +107,9 @@ static char *resp_state_name[] = {
> > >  void rxe_resp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
> > >  			struct sk_buff *skb)
> > >  {
> > > -	int must_sched;
> > > -	struct rxe_pkt_info *pkt = SKB_TO_PKT(skb);
> > > -
> > >  	skb_queue_tail(&qp->req_pkts, skb);
> > >  
> > > -	must_sched = (pkt->opcode == IB_OPCODE_RC_RDMA_READ_REQUEST) ||
> > > -			(skb_queue_len(&qp->req_pkts) > 1);
> > > -
> > > -	rxe_run_task(&qp->resp.task, must_sched);
> > > +	rxe_run_task(&qp->resp.task, 1);
> > >  }
> > >  
> > >  static inline enum resp_states get_req(struct rxe_qp *qp,
> > > diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
> > > index 08f05ac..8b7cb97 100644
> > > --- a/drivers/infiniband/sw/rxe/rxe_task.c
> > > +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> > > @@ -37,6 +37,8 @@
> > >  
> > >  #include "rxe_task.h"
> > >  
> > > +static void rxe_run_task_local(void *data);
> > > +
> > >  int __rxe_do_task(struct rxe_task *task)
> > >  
> > >  {
> > > @@ -63,6 +65,7 @@ void rxe_do_task(unsigned long data)
> > >  	struct rxe_task *task = (struct rxe_task *)data;
> > >  
> > >  	spin_lock_irqsave(&task->state_lock, flags);
> > > +	task->scheduled = false;
> > >  	switch (task->state) {
> > >  	case TASK_STATE_START:
> > >  		task->state = TASK_STATE_BUSY;
> > > @@ -108,20 +111,27 @@ void rxe_do_task(unsigned long data)
> > >  			pr_warn("%s failed with bad state %d\n", __func__,
> > >  				task->state);
> > >  		}
> > > +
> > > +		if (!cont && task->scheduled)
> > > +			tasklet_schedule(&task->tasklet);
> > >  		spin_unlock_irqrestore(&task->state_lock, flags);
> > >  	} while (cont);
> > >  
> > >  	task->ret = ret;
> > >  }
> > >  
> > > -int rxe_init_task(void *obj, struct rxe_task *task,
> > > +int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
> > >  		  void *arg, int (*func)(void *), char *name)
> > >  {
> > > +	task->cpu	= cpu;
> > >  	task->obj	= obj;
> > >  	task->arg	= arg;
> > >  	task->func	= func;
> > > +	task->csd.func  = rxe_run_task_local;
> > > +	task->csd.info  = task;
> > >  	snprintf(task->name, sizeof(task->name), "%s", name);
> > >  	task->destroyed	= false;
> > > +	task->scheduled	= false;
> > >  
> > >  	tasklet_init(&task->tasklet, rxe_do_task, (unsigned long)task);
> > >  
> > > @@ -151,15 +161,44 @@ void rxe_cleanup_task(struct rxe_task *task)
> > >  	tasklet_kill(&task->tasklet);
> > >  }
> > >  
> > > +static void rxe_run_task_local(void *data)
> > > +{
> > > +	struct rxe_task *task = (struct rxe_task *)data;
> > > +
> > > +	if (task->destroyed)
> > > +		return;
> > > +
> > > +	tasklet_schedule(&task->tasklet);
> > > +}
> > > +
> > >  void rxe_run_task(struct rxe_task *task, int sched)
> > >  {
> > > +	int cpu;
> > > +	unsigned long flags;
> > > +
> > >  	if (task->destroyed)
> > >  		return;
> > >  
> > > -	if (sched)
> > > +	if (!sched) {
> > > +		rxe_do_task((unsigned long)task);
> > > +		return;
> > > +	}
> > > +
> > > +	spin_lock_irqsave(&task->state_lock, flags);
> > > +	if (task->scheduled || task->state != TASK_STATE_START) {
> > > +		task->scheduled = true;
> > > +		spin_unlock_irqrestore(&task->state_lock, flags);
> > > +		return;
> > > +	}
> > > +	task->scheduled = true;
> > > +	spin_unlock_irqrestore(&task->state_lock, flags);
> > > +
> > > +	cpu = get_cpu();
> > > +	if (task->cpu == cpu || !cpu_online(task->cpu))
> > >  		tasklet_schedule(&task->tasklet);
> > >  	else
> > > -		rxe_do_task((unsigned long)task);
> > > +		smp_call_function_single_async(task->cpu, &task->csd);
> > > +	put_cpu();
> > >  }
> > >  
> > >  void rxe_disable_task(struct rxe_task *task)
> > > diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
> > > index 08ff42d..1470dee 100644
> > > --- a/drivers/infiniband/sw/rxe/rxe_task.h
> > > +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> > > @@ -34,6 +34,8 @@
> > >  #ifndef RXE_TASK_H
> > >  #define RXE_TASK_H
> > >  
> > > +#include <linux/smp.h>
> > > +
> > >  enum {
> > >  	TASK_STATE_START	= 0,
> > >  	TASK_STATE_BUSY		= 1,
> > > @@ -48,13 +50,16 @@ enum {
> > >  struct rxe_task {
> > >  	void			*obj;
> > >  	struct tasklet_struct	tasklet;
> > > +	int			cpu;
> > >  	int			state;
> > >  	spinlock_t		state_lock; /* spinlock for task state */
> > >  	void			*arg;
> > >  	int			(*func)(void *arg);
> > > +	call_single_data_t	csd;
> > >  	int			ret;
> > >  	char			name[16];
> > >  	bool			destroyed;
> > > +	bool			scheduled;
> > >  };
> > >  
> > >  /*
> > > @@ -62,7 +67,7 @@ struct rxe_task {
> > >   *	arg  => parameter to pass to fcn
> > >   *	fcn  => function to call until it returns != 0
> > >   */
> > > -int rxe_init_task(void *obj, struct rxe_task *task,
> > > +int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
> > >  		  void *arg, int (*func)(void *), char *name);
> > >  
> > >  /* cleanup task */
> > > -- 
> > > 2.7.4
> > > 
> > > --
> > > To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> > > the body of a message to majordomo@vger.kernel.org
> > > More majordomo info at  http://vger.kernel.org/majordomo-info.html
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Vijay Immanuel July 19, 2018, 9:28 p.m. UTC | #4
On Wed, Jul 18, 2018 at 04:16:03PM +0300, Yuval Shaia wrote:
> On Wed, Jul 18, 2018 at 01:51:20AM -0700, Vijay Immanuel wrote:
> > On Tue, Jul 17, 2018 at 12:04:47AM +0300, Yuval Shaia wrote:
> > > On Mon, Jul 09, 2018 at 07:38:53PM -0700, Vijay Immanuel wrote:
> > > > A QP must use the same NIC TX queue to maintain packet order. The TX
> > > > queue is usually selected based on the core from which the transmit
> > > > was originated.
> > > > Assign QPs to cores to better spread traffic across TX queues. This
> > > > requires scheduling the tasklets in the cpu assigned to the QP. The
> > > > transmit cpu is selected based on the source QPN.
> > > 
> > > Can you share test results?
> 
> Hi Vijay,
> 
> > 
> > I'm running NVMeoF host and target over RXE, single SSD, 40Gbps network, one switch.
> > IOPs (FIO 4K random reads, qd 64, jobs 4) - 80,000 (w/o patch), 141,000 (w/ patch). 
> > Latency (FIO 4K random reads, qd 1, jobs 1) - 173 us (w/o patch), 180 us (w/ patch). 
> > BW Sequential read and write were similar in either case, about 1,100 MB/s. 
> 
> I tried this patch with much more simpler setup - a QEMU VM with 4 CPUs and
> 4G RAM.
> Unfortunately i was not happy with the results, maybe i'm using the wrong
> tools for performance measurements but what i saw is the opposite, with
> this patch numbers was worse than without.
> I ran ibv_rc_pingpong where server and clients resides on the same VM with
> 4k and 1m message sizes and got this:
> 
> 	w/o	w/
> ---------------------
> 4k:	~19	~31
> 1m:	~3847	~5647
> 
> Result is usec/iter.
> 
> Regardless of this, i briefly reviewed the patch and found no logical
> mistakes in it but still wanted to put the things on the table.
> 
> Yuval
> 

Hi Yuval,

Thanks for trying this patch and bringing up the issue.
Your results are not surprising given that this patch always schedules
the responder and completer tasks, whereas previously they would have
run in the caller's context for a ping-pong type of application. 
I'll run some more tests to see if we can make things better.

> > 
> > > 
> > > > 
> > > > Signed-off-by: Vijay Immanuel <vijayi@attalasystems.com>
> > > > ---
> > > > Changes in v2:
> > > >   - Removed use of comp_vector for selecing the core.
> > > >   - Removed scheduling the req task for user QPs.
> > > > 
> > > >  drivers/infiniband/sw/rxe/rxe_comp.c |  8 +------
> > > >  drivers/infiniband/sw/rxe/rxe_qp.c   | 10 +++++---
> > > >  drivers/infiniband/sw/rxe/rxe_resp.c |  8 +------
> > > >  drivers/infiniband/sw/rxe/rxe_task.c | 45 +++++++++++++++++++++++++++++++++---
> > > >  drivers/infiniband/sw/rxe/rxe_task.h |  7 +++++-
> > > >  5 files changed, 57 insertions(+), 21 deletions(-)
> > > > 
> > > > diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
> > > > index 6cdc40e..189f80e 100644
> > > > --- a/drivers/infiniband/sw/rxe/rxe_comp.c
> > > > +++ b/drivers/infiniband/sw/rxe/rxe_comp.c
> > > > @@ -149,14 +149,8 @@ void retransmit_timer(struct timer_list *t)
> > > >  void rxe_comp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
> > > >  			struct sk_buff *skb)
> > > >  {
> > > > -	int must_sched;
> > > > -
> > > >  	skb_queue_tail(&qp->resp_pkts, skb);
> > > > -
> > > > -	must_sched = skb_queue_len(&qp->resp_pkts) > 1;
> > > > -	if (must_sched != 0)
> > > > -		rxe_counter_inc(rxe, RXE_CNT_COMPLETER_SCHED);
> > > > -	rxe_run_task(&qp->comp.task, must_sched);
> > > > +	rxe_run_task(&qp->comp.task, 1);
> > > >  }
> > > >  
> > > >  static inline enum comp_state get_wqe(struct rxe_qp *qp,
> > > > diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
> > > > index b9f7aa1..dc11227 100644
> > > > --- a/drivers/infiniband/sw/rxe/rxe_qp.c
> > > > +++ b/drivers/infiniband/sw/rxe/rxe_qp.c
> > > > @@ -221,6 +221,7 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
> > > >  {
> > > >  	int err;
> > > >  	int wqe_size;
> > > > +	int qp_cpu;
> > > >  
> > > >  	err = sock_create_kern(&init_net, AF_INET, SOCK_DGRAM, 0, &qp->sk);
> > > >  	if (err < 0)
> > > > @@ -260,9 +261,10 @@ static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
> > > >  	spin_lock_init(&qp->sq.sq_lock);
> > > >  	skb_queue_head_init(&qp->req_pkts);
> > > >  
> > > > -	rxe_init_task(rxe, &qp->req.task, qp,
> > > > +	qp_cpu = qp_num(qp) % num_online_cpus();
> > > > +	rxe_init_task(rxe, &qp->req.task, qp_cpu, qp,
> > > >  		      rxe_requester, "req");
> > > > -	rxe_init_task(rxe, &qp->comp.task, qp,
> > > > +	rxe_init_task(rxe, &qp->comp.task, qp_cpu, qp,
> > > >  		      rxe_completer, "comp");
> > > >  
> > > >  	qp->qp_timeout_jiffies = 0; /* Can't be set for UD/UC in modify_qp */
> > > > @@ -280,6 +282,7 @@ static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
> > > >  {
> > > >  	int err;
> > > >  	int wqe_size;
> > > > +	int qp_cpu;
> > > >  
> > > >  	if (!qp->srq) {
> > > >  		qp->rq.max_wr		= init->cap.max_recv_wr;
> > > > @@ -311,7 +314,8 @@ static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
> > > >  
> > > >  	skb_queue_head_init(&qp->resp_pkts);
> > > >  
> > > > -	rxe_init_task(rxe, &qp->resp.task, qp,
> > > > +	qp_cpu = qp_num(qp) % num_online_cpus();
> > > > +	rxe_init_task(rxe, &qp->resp.task, qp_cpu, qp,
> > > >  		      rxe_responder, "resp");
> > > >  
> > > >  	qp->resp.opcode		= OPCODE_NONE;
> > > > diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
> > > > index 955ff3b..853ab49 100644
> > > > --- a/drivers/infiniband/sw/rxe/rxe_resp.c
> > > > +++ b/drivers/infiniband/sw/rxe/rxe_resp.c
> > > > @@ -107,15 +107,9 @@ static char *resp_state_name[] = {
> > > >  void rxe_resp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
> > > >  			struct sk_buff *skb)
> > > >  {
> > > > -	int must_sched;
> > > > -	struct rxe_pkt_info *pkt = SKB_TO_PKT(skb);
> > > > -
> > > >  	skb_queue_tail(&qp->req_pkts, skb);
> > > >  
> > > > -	must_sched = (pkt->opcode == IB_OPCODE_RC_RDMA_READ_REQUEST) ||
> > > > -			(skb_queue_len(&qp->req_pkts) > 1);
> > > > -
> > > > -	rxe_run_task(&qp->resp.task, must_sched);
> > > > +	rxe_run_task(&qp->resp.task, 1);
> > > >  }
> > > >  
> > > >  static inline enum resp_states get_req(struct rxe_qp *qp,
> > > > diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
> > > > index 08f05ac..8b7cb97 100644
> > > > --- a/drivers/infiniband/sw/rxe/rxe_task.c
> > > > +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> > > > @@ -37,6 +37,8 @@
> > > >  
> > > >  #include "rxe_task.h"
> > > >  
> > > > +static void rxe_run_task_local(void *data);
> > > > +
> > > >  int __rxe_do_task(struct rxe_task *task)
> > > >  
> > > >  {
> > > > @@ -63,6 +65,7 @@ void rxe_do_task(unsigned long data)
> > > >  	struct rxe_task *task = (struct rxe_task *)data;
> > > >  
> > > >  	spin_lock_irqsave(&task->state_lock, flags);
> > > > +	task->scheduled = false;
> > > >  	switch (task->state) {
> > > >  	case TASK_STATE_START:
> > > >  		task->state = TASK_STATE_BUSY;
> > > > @@ -108,20 +111,27 @@ void rxe_do_task(unsigned long data)
> > > >  			pr_warn("%s failed with bad state %d\n", __func__,
> > > >  				task->state);
> > > >  		}
> > > > +
> > > > +		if (!cont && task->scheduled)
> > > > +			tasklet_schedule(&task->tasklet);
> > > >  		spin_unlock_irqrestore(&task->state_lock, flags);
> > > >  	} while (cont);
> > > >  
> > > >  	task->ret = ret;
> > > >  }
> > > >  
> > > > -int rxe_init_task(void *obj, struct rxe_task *task,
> > > > +int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
> > > >  		  void *arg, int (*func)(void *), char *name)
> > > >  {
> > > > +	task->cpu	= cpu;
> > > >  	task->obj	= obj;
> > > >  	task->arg	= arg;
> > > >  	task->func	= func;
> > > > +	task->csd.func  = rxe_run_task_local;
> > > > +	task->csd.info  = task;
> > > >  	snprintf(task->name, sizeof(task->name), "%s", name);
> > > >  	task->destroyed	= false;
> > > > +	task->scheduled	= false;
> > > >  
> > > >  	tasklet_init(&task->tasklet, rxe_do_task, (unsigned long)task);
> > > >  
> > > > @@ -151,15 +161,44 @@ void rxe_cleanup_task(struct rxe_task *task)
> > > >  	tasklet_kill(&task->tasklet);
> > > >  }
> > > >  
> > > > +static void rxe_run_task_local(void *data)
> > > > +{
> > > > +	struct rxe_task *task = (struct rxe_task *)data;
> > > > +
> > > > +	if (task->destroyed)
> > > > +		return;
> > > > +
> > > > +	tasklet_schedule(&task->tasklet);
> > > > +}
> > > > +
> > > >  void rxe_run_task(struct rxe_task *task, int sched)
> > > >  {
> > > > +	int cpu;
> > > > +	unsigned long flags;
> > > > +
> > > >  	if (task->destroyed)
> > > >  		return;
> > > >  
> > > > -	if (sched)
> > > > +	if (!sched) {
> > > > +		rxe_do_task((unsigned long)task);
> > > > +		return;
> > > > +	}
> > > > +
> > > > +	spin_lock_irqsave(&task->state_lock, flags);
> > > > +	if (task->scheduled || task->state != TASK_STATE_START) {
> > > > +		task->scheduled = true;
> > > > +		spin_unlock_irqrestore(&task->state_lock, flags);
> > > > +		return;
> > > > +	}
> > > > +	task->scheduled = true;
> > > > +	spin_unlock_irqrestore(&task->state_lock, flags);
> > > > +
> > > > +	cpu = get_cpu();
> > > > +	if (task->cpu == cpu || !cpu_online(task->cpu))
> > > >  		tasklet_schedule(&task->tasklet);
> > > >  	else
> > > > -		rxe_do_task((unsigned long)task);
> > > > +		smp_call_function_single_async(task->cpu, &task->csd);
> > > > +	put_cpu();
> > > >  }
> > > >  
> > > >  void rxe_disable_task(struct rxe_task *task)
> > > > diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
> > > > index 08ff42d..1470dee 100644
> > > > --- a/drivers/infiniband/sw/rxe/rxe_task.h
> > > > +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> > > > @@ -34,6 +34,8 @@
> > > >  #ifndef RXE_TASK_H
> > > >  #define RXE_TASK_H
> > > >  
> > > > +#include <linux/smp.h>
> > > > +
> > > >  enum {
> > > >  	TASK_STATE_START	= 0,
> > > >  	TASK_STATE_BUSY		= 1,
> > > > @@ -48,13 +50,16 @@ enum {
> > > >  struct rxe_task {
> > > >  	void			*obj;
> > > >  	struct tasklet_struct	tasklet;
> > > > +	int			cpu;
> > > >  	int			state;
> > > >  	spinlock_t		state_lock; /* spinlock for task state */
> > > >  	void			*arg;
> > > >  	int			(*func)(void *arg);
> > > > +	call_single_data_t	csd;
> > > >  	int			ret;
> > > >  	char			name[16];
> > > >  	bool			destroyed;
> > > > +	bool			scheduled;
> > > >  };
> > > >  
> > > >  /*
> > > > @@ -62,7 +67,7 @@ struct rxe_task {
> > > >   *	arg  => parameter to pass to fcn
> > > >   *	fcn  => function to call until it returns != 0
> > > >   */
> > > > -int rxe_init_task(void *obj, struct rxe_task *task,
> > > > +int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
> > > >  		  void *arg, int (*func)(void *), char *name);
> > > >  
> > > >  /* cleanup task */
> > > > -- 
> > > > 2.7.4
> > > > 
> > > > --
> > > > To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
> > > > the body of a message to majordomo@vger.kernel.org
> > > > More majordomo info at  http://vger.kernel.org/majordomo-info.html
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Vijay Immanuel July 26, 2018, 2:31 a.m. UTC | #5
On Thu, Jul 19, 2018 at 02:28:48PM -0700, Vijay Immanuel wrote:
> On Wed, Jul 18, 2018 at 04:16:03PM +0300, Yuval Shaia wrote:
> > On Wed, Jul 18, 2018 at 01:51:20AM -0700, Vijay Immanuel wrote:
> > > On Tue, Jul 17, 2018 at 12:04:47AM +0300, Yuval Shaia wrote:
> > > > On Mon, Jul 09, 2018 at 07:38:53PM -0700, Vijay Immanuel wrote:
> > > > > A QP must use the same NIC TX queue to maintain packet order. The
> TX
> > > > > queue is usually selected based on the core from which the
> transmit
> > > > > was originated.
> > > > > Assign QPs to cores to better spread traffic across TX queues.
> This
> > > > > requires scheduling the tasklets in the cpu assigned to the QP.
> The
> > > > > transmit cpu is selected based on the source QPN.
> > > > 
> > > > Can you share test results?
> > 
> > Hi Vijay,
> > 
> > > 
> > > I'm running NVMeoF host and target over RXE, single SSD, 40Gbps
> network, one switch.
> > > IOPs (FIO 4K random reads, qd 64, jobs 4) - 80,000 (w/o patch),
> 141,000 (w/ patch). 
> > > Latency (FIO 4K random reads, qd 1, jobs 1) - 173 us (w/o patch), 180
> us (w/ patch). 
> > > BW Sequential read and write were similar in either case, about 1,100
> MB/s. 
> > 
> > I tried this patch with much more simpler setup - a QEMU VM with 4 CPUs
> and
> > 4G RAM.
> > Unfortunately i was not happy with the results, maybe i'm using the
> wrong
> > tools for performance measurements but what i saw is the opposite, with
> > this patch numbers was worse than without.
> > I ran ibv_rc_pingpong where server and clients resides on the same VM
> with
> > 4k and 1m message sizes and got this:
> > 
> > 	w/o	w/
> > ---------------------
> > 4k:	~19	~31
> > 1m:	~3847	~5647
> > 
> > Result is usec/iter.
> > 
> > Regardless of this, i briefly reviewed the patch and found no logical
> > mistakes in it but still wanted to put the things on the table.
> > 
> > Yuval
> > 
> 
> Hi Yuval,
> 
> Thanks for trying this patch and bringing up the issue.
> Your results are not surprising given that this patch always schedules
> the responder and completer tasks, whereas previously they would have
> run in the caller's context for a ping-pong type of application. 
> I'll run some more tests to see if we can make things better.
> 

Hi Monis and Yuval,

After further experiments I'm dropping this patch from consideration.
As Yuval pointed out there is a latency degradation, which is significant
in some applications. I think that outweighs the throughput benefit. 
Moreover, the RX scaling patch that I've sent provides a greater improvement
in throughput, which eliminates the need for this patch.
Thanks,

Vijay


> > > 
> > > > 
> > > > > 
> > > > > Signed-off-by: Vijay Immanuel <vijayi@attalasystems.com>
> > > > > ---
> > > > > Changes in v2:
> > > > >   - Removed use of comp_vector for selecing the core.
> > > > >   - Removed scheduling the req task for user QPs.
> > > > > 
> > > > >  drivers/infiniband/sw/rxe/rxe_comp.c |  8 +------
> > > > >  drivers/infiniband/sw/rxe/rxe_qp.c   | 10 +++++---
> > > > >  drivers/infiniband/sw/rxe/rxe_resp.c |  8 +------
> > > > >  drivers/infiniband/sw/rxe/rxe_task.c | 45
> +++++++++++++++++++++++++++++++++---
> > > > >  drivers/infiniband/sw/rxe/rxe_task.h |  7 +++++-
> > > > >  5 files changed, 57 insertions(+), 21 deletions(-)
> > > > > 
> > > > > diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c
> b/drivers/infiniband/sw/rxe/rxe_comp.c
> > > > > index 6cdc40e..189f80e 100644
> > > > > --- a/drivers/infiniband/sw/rxe/rxe_comp.c
> > > > > +++ b/drivers/infiniband/sw/rxe/rxe_comp.c
> > > > > @@ -149,14 +149,8 @@ void retransmit_timer(struct timer_list *t)
> > > > >  void rxe_comp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
> > > > >  			struct sk_buff *skb)
> > > > >  {
> > > > > -	int must_sched;
> > > > > -
> > > > >  	skb_queue_tail(&qp->resp_pkts, skb);
> > > > > -
> > > > > -	must_sched = skb_queue_len(&qp->resp_pkts) > 1;
> > > > > -	if (must_sched != 0)
> > > > > -		rxe_counter_inc(rxe, RXE_CNT_COMPLETER_SCHED);
> > > > > -	rxe_run_task(&qp->comp.task, must_sched);
> > > > > +	rxe_run_task(&qp->comp.task, 1);
> > > > >  }
> > > > >  
> > > > >  static inline enum comp_state get_wqe(struct rxe_qp *qp,
> > > > > diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c
> b/drivers/infiniband/sw/rxe/rxe_qp.c
> > > > > index b9f7aa1..dc11227 100644
> > > > > --- a/drivers/infiniband/sw/rxe/rxe_qp.c
> > > > > +++ b/drivers/infiniband/sw/rxe/rxe_qp.c
> > > > > @@ -221,6 +221,7 @@ static int rxe_qp_init_req(struct rxe_dev
> *rxe, struct rxe_qp *qp,
> > > > >  {
> > > > >  	int err;
> > > > >  	int wqe_size;
> > > > > +	int qp_cpu;
> > > > >  
> > > > >  	err = sock_create_kern(&init_net, AF_INET, SOCK_DGRAM, 0,
> &qp->sk);
> > > > >  	if (err < 0)
> > > > > @@ -260,9 +261,10 @@ static int rxe_qp_init_req(struct rxe_dev
> *rxe, struct rxe_qp *qp,
> > > > >  	spin_lock_init(&qp->sq.sq_lock);
> > > > >  	skb_queue_head_init(&qp->req_pkts);
> > > > >  
> > > > > -	rxe_init_task(rxe, &qp->req.task, qp,
> > > > > +	qp_cpu = qp_num(qp) % num_online_cpus();
> > > > > +	rxe_init_task(rxe, &qp->req.task, qp_cpu, qp,
> > > > >  		      rxe_requester, "req");
> > > > > -	rxe_init_task(rxe, &qp->comp.task, qp,
> > > > > +	rxe_init_task(rxe, &qp->comp.task, qp_cpu, qp,
> > > > >  		      rxe_completer, "comp");
> > > > >  
> > > > >  	qp->qp_timeout_jiffies = 0; /* Can't be set for UD/UC in
> modify_qp */
> > > > > @@ -280,6 +282,7 @@ static int rxe_qp_init_resp(struct rxe_dev
> *rxe, struct rxe_qp *qp,
> > > > >  {
> > > > >  	int err;
> > > > >  	int wqe_size;
> > > > > +	int qp_cpu;
> > > > >  
> > > > >  	if (!qp->srq) {
> > > > >  		qp->rq.max_wr		= init->cap.max_recv_wr;
> > > > > @@ -311,7 +314,8 @@ static int rxe_qp_init_resp(struct rxe_dev
> *rxe, struct rxe_qp *qp,
> > > > >  
> > > > >  	skb_queue_head_init(&qp->resp_pkts);
> > > > >  
> > > > > -	rxe_init_task(rxe, &qp->resp.task, qp,
> > > > > +	qp_cpu = qp_num(qp) % num_online_cpus();
> > > > > +	rxe_init_task(rxe, &qp->resp.task, qp_cpu, qp,
> > > > >  		      rxe_responder, "resp");
> > > > >  
> > > > >  	qp->resp.opcode		= OPCODE_NONE;
> > > > > diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c
> b/drivers/infiniband/sw/rxe/rxe_resp.c
> > > > > index 955ff3b..853ab49 100644
> > > > > --- a/drivers/infiniband/sw/rxe/rxe_resp.c
> > > > > +++ b/drivers/infiniband/sw/rxe/rxe_resp.c
> > > > > @@ -107,15 +107,9 @@ static char *resp_state_name[] = {
> > > > >  void rxe_resp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
> > > > >  			struct sk_buff *skb)
> > > > >  {
> > > > > -	int must_sched;
> > > > > -	struct rxe_pkt_info *pkt = SKB_TO_PKT(skb);
> > > > > -
> > > > >  	skb_queue_tail(&qp->req_pkts, skb);
> > > > >  
> > > > > -	must_sched = (pkt->opcode ==
> IB_OPCODE_RC_RDMA_READ_REQUEST) ||
> > > > > -			(skb_queue_len(&qp->req_pkts) > 1);
> > > > > -
> > > > > -	rxe_run_task(&qp->resp.task, must_sched);
> > > > > +	rxe_run_task(&qp->resp.task, 1);
> > > > >  }
> > > > >  
> > > > >  static inline enum resp_states get_req(struct rxe_qp *qp,
> > > > > diff --git a/drivers/infiniband/sw/rxe/rxe_task.c
> b/drivers/infiniband/sw/rxe/rxe_task.c
> > > > > index 08f05ac..8b7cb97 100644
> > > > > --- a/drivers/infiniband/sw/rxe/rxe_task.c
> > > > > +++ b/drivers/infiniband/sw/rxe/rxe_task.c
> > > > > @@ -37,6 +37,8 @@
> > > > >  
> > > > >  #include "rxe_task.h"
> > > > >  
> > > > > +static void rxe_run_task_local(void *data);
> > > > > +
> > > > >  int __rxe_do_task(struct rxe_task *task)
> > > > >  
> > > > >  {
> > > > > @@ -63,6 +65,7 @@ void rxe_do_task(unsigned long data)
> > > > >  	struct rxe_task *task = (struct rxe_task *)data;
> > > > >  
> > > > >  	spin_lock_irqsave(&task->state_lock, flags);
> > > > > +	task->scheduled = false;
> > > > >  	switch (task->state) {
> > > > >  	case TASK_STATE_START:
> > > > >  		task->state = TASK_STATE_BUSY;
> > > > > @@ -108,20 +111,27 @@ void rxe_do_task(unsigned long data)
> > > > >  			pr_warn("%s failed with bad state %d\n",
> __func__,
> > > > >  				task->state);
> > > > >  		}
> > > > > +
> > > > > +		if (!cont && task->scheduled)
> > > > > +			tasklet_schedule(&task->tasklet);
> > > > >  		spin_unlock_irqrestore(&task->state_lock, flags);
> > > > >  	} while (cont);
> > > > >  
> > > > >  	task->ret = ret;
> > > > >  }
> > > > >  
> > > > > -int rxe_init_task(void *obj, struct rxe_task *task,
> > > > > +int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
> > > > >  		  void *arg, int (*func)(void *), char *name)
> > > > >  {
> > > > > +	task->cpu	= cpu;
> > > > >  	task->obj	= obj;
> > > > >  	task->arg	= arg;
> > > > >  	task->func	= func;
> > > > > +	task->csd.func  = rxe_run_task_local;
> > > > > +	task->csd.info  = task;
> > > > >  	snprintf(task->name, sizeof(task->name), "%s", name);
> > > > >  	task->destroyed	= false;
> > > > > +	task->scheduled	= false;
> > > > >  
> > > > >  	tasklet_init(&task->tasklet, rxe_do_task, (unsigned
> long)task);
> > > > >  
> > > > > @@ -151,15 +161,44 @@ void rxe_cleanup_task(struct rxe_task *task)
> > > > >  	tasklet_kill(&task->tasklet);
> > > > >  }
> > > > >  
> > > > > +static void rxe_run_task_local(void *data)
> > > > > +{
> > > > > +	struct rxe_task *task = (struct rxe_task *)data;
> > > > > +
> > > > > +	if (task->destroyed)
> > > > > +		return;
> > > > > +
> > > > > +	tasklet_schedule(&task->tasklet);
> > > > > +}
> > > > > +
> > > > >  void rxe_run_task(struct rxe_task *task, int sched)
> > > > >  {
> > > > > +	int cpu;
> > > > > +	unsigned long flags;
> > > > > +
> > > > >  	if (task->destroyed)
> > > > >  		return;
> > > > >  
> > > > > -	if (sched)
> > > > > +	if (!sched) {
> > > > > +		rxe_do_task((unsigned long)task);
> > > > > +		return;
> > > > > +	}
> > > > > +
> > > > > +	spin_lock_irqsave(&task->state_lock, flags);
> > > > > +	if (task->scheduled || task->state != TASK_STATE_START) {
> > > > > +		task->scheduled = true;
> > > > > +		spin_unlock_irqrestore(&task->state_lock, flags);
> > > > > +		return;
> > > > > +	}
> > > > > +	task->scheduled = true;
> > > > > +	spin_unlock_irqrestore(&task->state_lock, flags);
> > > > > +
> > > > > +	cpu = get_cpu();
> > > > > +	if (task->cpu == cpu || !cpu_online(task->cpu))
> > > > >  		tasklet_schedule(&task->tasklet);
> > > > >  	else
> > > > > -		rxe_do_task((unsigned long)task);
> > > > > +		smp_call_function_single_async(task->cpu,
> &task->csd);
> > > > > +	put_cpu();
> > > > >  }
> > > > >  
> > > > >  void rxe_disable_task(struct rxe_task *task)
> > > > > diff --git a/drivers/infiniband/sw/rxe/rxe_task.h
> b/drivers/infiniband/sw/rxe/rxe_task.h
> > > > > index 08ff42d..1470dee 100644
> > > > > --- a/drivers/infiniband/sw/rxe/rxe_task.h
> > > > > +++ b/drivers/infiniband/sw/rxe/rxe_task.h
> > > > > @@ -34,6 +34,8 @@
> > > > >  #ifndef RXE_TASK_H
> > > > >  #define RXE_TASK_H
> > > > >  
> > > > > +#include <linux/smp.h>
> > > > > +
> > > > >  enum {
> > > > >  	TASK_STATE_START	= 0,
> > > > >  	TASK_STATE_BUSY		= 1,
> > > > > @@ -48,13 +50,16 @@ enum {
> > > > >  struct rxe_task {
> > > > >  	void			*obj;
> > > > >  	struct tasklet_struct	tasklet;
> > > > > +	int			cpu;
> > > > >  	int			state;
> > > > >  	spinlock_t		state_lock; /* spinlock for task
> state */
> > > > >  	void			*arg;
> > > > >  	int			(*func)(void *arg);
> > > > > +	call_single_data_t	csd;
> > > > >  	int			ret;
> > > > >  	char			name[16];
> > > > >  	bool			destroyed;
> > > > > +	bool			scheduled;
> > > > >  };
> > > > >  
> > > > >  /*
> > > > > @@ -62,7 +67,7 @@ struct rxe_task {
> > > > >   *	arg  => parameter to pass to fcn
> > > > >   *	fcn  => function to call until it returns != 0
> > > > >   */
> > > > > -int rxe_init_task(void *obj, struct rxe_task *task,
> > > > > +int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
> > > > >  		  void *arg, int (*func)(void *), char *name);
> > > > >  
> > > > >  /* cleanup task */
> > > > > -- 
> > > > > 2.7.4
> > > > > 
> > > > > --
> > > > > To unsubscribe from this list: send the line "unsubscribe
> linux-rdma" in
> > > > > the body of a message to majordomo@vger.kernel.org
> > > > > More majordomo info at  http://vger.kernel.org/majordomo-info.html
--
To unsubscribe from this list: send the line "unsubscribe linux-rdma" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
diff mbox

Patch

diff --git a/drivers/infiniband/sw/rxe/rxe_comp.c b/drivers/infiniband/sw/rxe/rxe_comp.c
index 6cdc40e..189f80e 100644
--- a/drivers/infiniband/sw/rxe/rxe_comp.c
+++ b/drivers/infiniband/sw/rxe/rxe_comp.c
@@ -149,14 +149,8 @@  void retransmit_timer(struct timer_list *t)
 void rxe_comp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
 			struct sk_buff *skb)
 {
-	int must_sched;
-
 	skb_queue_tail(&qp->resp_pkts, skb);
-
-	must_sched = skb_queue_len(&qp->resp_pkts) > 1;
-	if (must_sched != 0)
-		rxe_counter_inc(rxe, RXE_CNT_COMPLETER_SCHED);
-	rxe_run_task(&qp->comp.task, must_sched);
+	rxe_run_task(&qp->comp.task, 1);
 }
 
 static inline enum comp_state get_wqe(struct rxe_qp *qp,
diff --git a/drivers/infiniband/sw/rxe/rxe_qp.c b/drivers/infiniband/sw/rxe/rxe_qp.c
index b9f7aa1..dc11227 100644
--- a/drivers/infiniband/sw/rxe/rxe_qp.c
+++ b/drivers/infiniband/sw/rxe/rxe_qp.c
@@ -221,6 +221,7 @@  static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
 {
 	int err;
 	int wqe_size;
+	int qp_cpu;
 
 	err = sock_create_kern(&init_net, AF_INET, SOCK_DGRAM, 0, &qp->sk);
 	if (err < 0)
@@ -260,9 +261,10 @@  static int rxe_qp_init_req(struct rxe_dev *rxe, struct rxe_qp *qp,
 	spin_lock_init(&qp->sq.sq_lock);
 	skb_queue_head_init(&qp->req_pkts);
 
-	rxe_init_task(rxe, &qp->req.task, qp,
+	qp_cpu = qp_num(qp) % num_online_cpus();
+	rxe_init_task(rxe, &qp->req.task, qp_cpu, qp,
 		      rxe_requester, "req");
-	rxe_init_task(rxe, &qp->comp.task, qp,
+	rxe_init_task(rxe, &qp->comp.task, qp_cpu, qp,
 		      rxe_completer, "comp");
 
 	qp->qp_timeout_jiffies = 0; /* Can't be set for UD/UC in modify_qp */
@@ -280,6 +282,7 @@  static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
 {
 	int err;
 	int wqe_size;
+	int qp_cpu;
 
 	if (!qp->srq) {
 		qp->rq.max_wr		= init->cap.max_recv_wr;
@@ -311,7 +314,8 @@  static int rxe_qp_init_resp(struct rxe_dev *rxe, struct rxe_qp *qp,
 
 	skb_queue_head_init(&qp->resp_pkts);
 
-	rxe_init_task(rxe, &qp->resp.task, qp,
+	qp_cpu = qp_num(qp) % num_online_cpus();
+	rxe_init_task(rxe, &qp->resp.task, qp_cpu, qp,
 		      rxe_responder, "resp");
 
 	qp->resp.opcode		= OPCODE_NONE;
diff --git a/drivers/infiniband/sw/rxe/rxe_resp.c b/drivers/infiniband/sw/rxe/rxe_resp.c
index 955ff3b..853ab49 100644
--- a/drivers/infiniband/sw/rxe/rxe_resp.c
+++ b/drivers/infiniband/sw/rxe/rxe_resp.c
@@ -107,15 +107,9 @@  static char *resp_state_name[] = {
 void rxe_resp_queue_pkt(struct rxe_dev *rxe, struct rxe_qp *qp,
 			struct sk_buff *skb)
 {
-	int must_sched;
-	struct rxe_pkt_info *pkt = SKB_TO_PKT(skb);
-
 	skb_queue_tail(&qp->req_pkts, skb);
 
-	must_sched = (pkt->opcode == IB_OPCODE_RC_RDMA_READ_REQUEST) ||
-			(skb_queue_len(&qp->req_pkts) > 1);
-
-	rxe_run_task(&qp->resp.task, must_sched);
+	rxe_run_task(&qp->resp.task, 1);
 }
 
 static inline enum resp_states get_req(struct rxe_qp *qp,
diff --git a/drivers/infiniband/sw/rxe/rxe_task.c b/drivers/infiniband/sw/rxe/rxe_task.c
index 08f05ac..8b7cb97 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.c
+++ b/drivers/infiniband/sw/rxe/rxe_task.c
@@ -37,6 +37,8 @@ 
 
 #include "rxe_task.h"
 
+static void rxe_run_task_local(void *data);
+
 int __rxe_do_task(struct rxe_task *task)
 
 {
@@ -63,6 +65,7 @@  void rxe_do_task(unsigned long data)
 	struct rxe_task *task = (struct rxe_task *)data;
 
 	spin_lock_irqsave(&task->state_lock, flags);
+	task->scheduled = false;
 	switch (task->state) {
 	case TASK_STATE_START:
 		task->state = TASK_STATE_BUSY;
@@ -108,20 +111,27 @@  void rxe_do_task(unsigned long data)
 			pr_warn("%s failed with bad state %d\n", __func__,
 				task->state);
 		}
+
+		if (!cont && task->scheduled)
+			tasklet_schedule(&task->tasklet);
 		spin_unlock_irqrestore(&task->state_lock, flags);
 	} while (cont);
 
 	task->ret = ret;
 }
 
-int rxe_init_task(void *obj, struct rxe_task *task,
+int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
 		  void *arg, int (*func)(void *), char *name)
 {
+	task->cpu	= cpu;
 	task->obj	= obj;
 	task->arg	= arg;
 	task->func	= func;
+	task->csd.func  = rxe_run_task_local;
+	task->csd.info  = task;
 	snprintf(task->name, sizeof(task->name), "%s", name);
 	task->destroyed	= false;
+	task->scheduled	= false;
 
 	tasklet_init(&task->tasklet, rxe_do_task, (unsigned long)task);
 
@@ -151,15 +161,44 @@  void rxe_cleanup_task(struct rxe_task *task)
 	tasklet_kill(&task->tasklet);
 }
 
+static void rxe_run_task_local(void *data)
+{
+	struct rxe_task *task = (struct rxe_task *)data;
+
+	if (task->destroyed)
+		return;
+
+	tasklet_schedule(&task->tasklet);
+}
+
 void rxe_run_task(struct rxe_task *task, int sched)
 {
+	int cpu;
+	unsigned long flags;
+
 	if (task->destroyed)
 		return;
 
-	if (sched)
+	if (!sched) {
+		rxe_do_task((unsigned long)task);
+		return;
+	}
+
+	spin_lock_irqsave(&task->state_lock, flags);
+	if (task->scheduled || task->state != TASK_STATE_START) {
+		task->scheduled = true;
+		spin_unlock_irqrestore(&task->state_lock, flags);
+		return;
+	}
+	task->scheduled = true;
+	spin_unlock_irqrestore(&task->state_lock, flags);
+
+	cpu = get_cpu();
+	if (task->cpu == cpu || !cpu_online(task->cpu))
 		tasklet_schedule(&task->tasklet);
 	else
-		rxe_do_task((unsigned long)task);
+		smp_call_function_single_async(task->cpu, &task->csd);
+	put_cpu();
 }
 
 void rxe_disable_task(struct rxe_task *task)
diff --git a/drivers/infiniband/sw/rxe/rxe_task.h b/drivers/infiniband/sw/rxe/rxe_task.h
index 08ff42d..1470dee 100644
--- a/drivers/infiniband/sw/rxe/rxe_task.h
+++ b/drivers/infiniband/sw/rxe/rxe_task.h
@@ -34,6 +34,8 @@ 
 #ifndef RXE_TASK_H
 #define RXE_TASK_H
 
+#include <linux/smp.h>
+
 enum {
 	TASK_STATE_START	= 0,
 	TASK_STATE_BUSY		= 1,
@@ -48,13 +50,16 @@  enum {
 struct rxe_task {
 	void			*obj;
 	struct tasklet_struct	tasklet;
+	int			cpu;
 	int			state;
 	spinlock_t		state_lock; /* spinlock for task state */
 	void			*arg;
 	int			(*func)(void *arg);
+	call_single_data_t	csd;
 	int			ret;
 	char			name[16];
 	bool			destroyed;
+	bool			scheduled;
 };
 
 /*
@@ -62,7 +67,7 @@  struct rxe_task {
  *	arg  => parameter to pass to fcn
  *	fcn  => function to call until it returns != 0
  */
-int rxe_init_task(void *obj, struct rxe_task *task,
+int rxe_init_task(void *obj, struct rxe_task *task, int cpu,
 		  void *arg, int (*func)(void *), char *name);
 
 /* cleanup task */