diff mbox series

[mlx5-next,09/10] net/rds: Handle ODP mr registration/unregistration

Message ID 20200115124340.79108-10-leon@kernel.org (mailing list archive)
State Superseded
Headers show
Series Use ODP MRs for kernel ULPs | expand

Commit Message

Leon Romanovsky Jan. 15, 2020, 12:43 p.m. UTC
From: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>

On-Demand-Paging MRs are registered using ib_reg_user_mr and
unregistered with ib_dereg_mr.

Signed-off-by: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>
Signed-off-by: Leon Romanovsky <leonro@mellanox.com>
---
 net/rds/ib.c      |   7 +++
 net/rds/ib.h      |   3 +-
 net/rds/ib_mr.h   |   7 ++-
 net/rds/ib_rdma.c |  74 ++++++++++++++++++++++-
 net/rds/ib_send.c |  44 ++++++++++----
 net/rds/rdma.c    | 150 ++++++++++++++++++++++++++++++++++------------
 net/rds/rds.h     |  13 +++-
 7 files changed, 242 insertions(+), 56 deletions(-)

--
2.20.1

Comments

Santosh Shilimkar Jan. 15, 2020, 9:51 p.m. UTC | #1
On 1/15/20 4:43 AM, Leon Romanovsky wrote:
> From: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>
> 
> On-Demand-Paging MRs are registered using ib_reg_user_mr and
> unregistered with ib_dereg_mr.
> 
> Signed-off-by: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>
> Signed-off-by: Leon Romanovsky <leonro@mellanox.com>
> ---

Have already reviewed this patchset on internal list. Couple of
minor nits below o.w patch looks good to me.

Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>

[...]

> diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
> index c8c1e3ae8d84..5a02b313ec50 100644
> --- a/net/rds/ib_rdma.c
> +++ b/net/rds/ib_rdma.c
> @@ -37,8 +37,15 @@
> 
>   #include "rds_single_path.h"
>   #include "ib_mr.h"
> +#include "rds.h"
> 
>   struct workqueue_struct *rds_ib_mr_wq;
> +struct rds_ib_dereg_odp_mr {
> +	struct work_struct work;
> +	struct ib_mr *mr;
> +};
> +
> +static void rds_ib_odp_mr_worker(struct work_struct *work);
> 
>   static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
>   {
> @@ -213,6 +220,8 @@ void rds_ib_sync_mr(void *trans_private, int direction)
>   	struct rds_ib_mr *ibmr = trans_private;
>   	struct rds_ib_device *rds_ibdev = ibmr->device;
> 
> +	if (ibmr->odp)
> +		return;
Add a new line here.
>   	switch (direction) {
>   	case DMA_FROM_DEVICE:
>   		ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,

[...]

> diff --git a/net/rds/rdma.c b/net/rds/rdma.c
> index eb23c38ce2b3..3c6afdda709b 100644
> --- a/net/rds/rdma.c
> +++ b/net/rds/rdma.c
> @@ -177,13 +177,14 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
>   			  struct rds_conn_path *cp)
>   {
>   	struct rds_mr *mr = NULL, *found;
> +	struct scatterlist *sg = NULL;
>   	unsigned int nr_pages;
>   	struct page **pages = NULL;
> -	struct scatterlist *sg;
>   	void *trans_private;
>   	unsigned long flags;
>   	rds_rdma_cookie_t cookie;
> -	unsigned int nents;
> +	unsigned int nents = 0;
> +	int need_odp = 0;
>   	long i;
>   	int ret;
> 
> @@ -196,6 +197,20 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
>   		ret = -EOPNOTSUPP;
>   		goto out;
>   	}
New line pls
Leon Romanovsky Jan. 16, 2020, 7:11 a.m. UTC | #2
On Wed, Jan 15, 2020 at 01:51:23PM -0800, santosh.shilimkar@oracle.com wrote:
> On 1/15/20 4:43 AM, Leon Romanovsky wrote:
> > From: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>
> >
> > On-Demand-Paging MRs are registered using ib_reg_user_mr and
> > unregistered with ib_dereg_mr.
> >
> > Signed-off-by: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>
> > Signed-off-by: Leon Romanovsky <leonro@mellanox.com>
> > ---
>
> Have already reviewed this patchset on internal list. Couple of
> minor nits below o.w patch looks good to me.
>
> Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>

Thanks Santosh, Once, I'll figure the apply path for this series,
I will add extra lines while applying the patches.
Santosh Shilimkar Jan. 16, 2020, 7:22 a.m. UTC | #3
On 1/15/20 11:11 PM, Leon Romanovsky wrote:
> On Wed, Jan 15, 2020 at 01:51:23PM -0800, santosh.shilimkar@oracle.com wrote:
>> On 1/15/20 4:43 AM, Leon Romanovsky wrote:
>>> From: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>
>>>
>>> On-Demand-Paging MRs are registered using ib_reg_user_mr and
>>> unregistered with ib_dereg_mr.
>>>
>>> Signed-off-by: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>
>>> Signed-off-by: Leon Romanovsky <leonro@mellanox.com>
>>> ---
>>
>> Have already reviewed this patchset on internal list. Couple of
>> minor nits below o.w patch looks good to me.
>>
>> Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
> 
> Thanks Santosh, Once, I'll figure the apply path for this series,
> I will add extra lines while applying the patches.
> 
Sure. Thanks for picking it up !!

Regards,
Santosh
Leon Romanovsky Jan. 18, 2020, 10:19 a.m. UTC | #4
On Wed, Jan 15, 2020 at 02:43:39PM +0200, Leon Romanovsky wrote:
> From: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>
>
> On-Demand-Paging MRs are registered using ib_reg_user_mr and
> unregistered with ib_dereg_mr.
>
> Signed-off-by: Hans Westgaard Ry <hans.westgaard.ry@oracle.com>
> Signed-off-by: Leon Romanovsky <leonro@mellanox.com>
> ---
>  net/rds/ib.c      |   7 +++
>  net/rds/ib.h      |   3 +-
>  net/rds/ib_mr.h   |   7 ++-
>  net/rds/ib_rdma.c |  74 ++++++++++++++++++++++-
>  net/rds/ib_send.c |  44 ++++++++++----
>  net/rds/rdma.c    | 150 ++++++++++++++++++++++++++++++++++------------
>  net/rds/rds.h     |  13 +++-
>  7 files changed, 242 insertions(+), 56 deletions(-)
>
> diff --git a/net/rds/ib.c b/net/rds/ib.c
> index 3fd5f40189bd..a792d8a3872a 100644
> --- a/net/rds/ib.c
> +++ b/net/rds/ib.c
> @@ -156,6 +156,13 @@ static void rds_ib_add_one(struct ib_device *device)
>  	has_fmr = (device->ops.alloc_fmr && device->ops.dealloc_fmr &&
>  		   device->ops.map_phys_fmr && device->ops.unmap_fmr);
>  	rds_ibdev->use_fastreg = (has_fr && !has_fmr);
> +	rds_ibdev->odp_capable =
> +		!!(device->attrs.device_cap_flags &
> +		   IB_DEVICE_ON_DEMAND_PAGING) &&
> +		!!(device->attrs.odp_caps.per_transport_caps.rc_odp_caps &
> +		   IB_ODP_SUPPORT_WRITE) &&
> +		!!(device->attrs.odp_caps.per_transport_caps.rc_odp_caps &
> +		   IB_ODP_SUPPORT_READ);
>
>  	rds_ibdev->fmr_max_remaps = device->attrs.max_map_per_fmr?: 32;
>  	rds_ibdev->max_1m_mrs = device->attrs.max_mr ?
> diff --git a/net/rds/ib.h b/net/rds/ib.h
> index 6e6f24753998..0296f1f7acda 100644
> --- a/net/rds/ib.h
> +++ b/net/rds/ib.h
> @@ -247,7 +247,8 @@ struct rds_ib_device {
>  	struct ib_device	*dev;
>  	struct ib_pd		*pd;
>  	struct dma_pool		*rid_hdrs_pool; /* RDS headers DMA pool */
> -	bool                    use_fastreg;
> +	u8			use_fastreg:1;
> +	u8			odp_capable:1;
>
>  	unsigned int		max_mrs;
>  	struct rds_ib_mr_pool	*mr_1m_pool;
> diff --git a/net/rds/ib_mr.h b/net/rds/ib_mr.h
> index 9045a8c0edff..0c8252d7fe2b 100644
> --- a/net/rds/ib_mr.h
> +++ b/net/rds/ib_mr.h
> @@ -67,6 +67,7 @@ struct rds_ib_frmr {
>
>  /* This is stored as mr->r_trans_private. */
>  struct rds_ib_mr {
> +	struct delayed_work		work;
>  	struct rds_ib_device		*device;
>  	struct rds_ib_mr_pool		*pool;
>  	struct rds_ib_connection	*ic;
> @@ -81,9 +82,11 @@ struct rds_ib_mr {
>  	unsigned int			sg_len;
>  	int				sg_dma_len;
>
> +	u8				odp:1;
>  	union {
>  		struct rds_ib_fmr	fmr;
>  		struct rds_ib_frmr	frmr;
> +		struct ib_mr		*mr;
>  	} u;
>  };
>
> @@ -122,12 +125,14 @@ void rds6_ib_get_mr_info(struct rds_ib_device *rds_ibdev,
>  void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
>  void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
>  		    struct rds_sock *rs, u32 *key_ret,
> -		    struct rds_connection *conn);
> +		    struct rds_connection *conn, u64 start, u64 length,
> +		    int need_odp);
>  void rds_ib_sync_mr(void *trans_private, int dir);
>  void rds_ib_free_mr(void *trans_private, int invalidate);
>  void rds_ib_flush_mrs(void);
>  int rds_ib_mr_init(void);
>  void rds_ib_mr_exit(void);
> +u32 rds_ib_get_lkey(void *trans_private);
>
>  void __rds_ib_teardown_mr(struct rds_ib_mr *);
>  void rds_ib_teardown_mr(struct rds_ib_mr *);
> diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
> index c8c1e3ae8d84..5a02b313ec50 100644
> --- a/net/rds/ib_rdma.c
> +++ b/net/rds/ib_rdma.c
> @@ -37,8 +37,15 @@
>
>  #include "rds_single_path.h"
>  #include "ib_mr.h"
> +#include "rds.h"
>
>  struct workqueue_struct *rds_ib_mr_wq;
> +struct rds_ib_dereg_odp_mr {
> +	struct work_struct work;
> +	struct ib_mr *mr;
> +};
> +
> +static void rds_ib_odp_mr_worker(struct work_struct *work);
>
>  static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
>  {
> @@ -213,6 +220,8 @@ void rds_ib_sync_mr(void *trans_private, int direction)
>  	struct rds_ib_mr *ibmr = trans_private;
>  	struct rds_ib_device *rds_ibdev = ibmr->device;
>
> +	if (ibmr->odp)
> +		return;
>  	switch (direction) {
>  	case DMA_FROM_DEVICE:
>  		ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,
> @@ -482,6 +491,16 @@ void rds_ib_free_mr(void *trans_private, int invalidate)
>
>  	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);
>
> +	if (ibmr->odp) {
> +		/* A MR created and marked as use_once. We use delayed work,
> +		 * because there is a change that we are in interrupt and can't
> +		 * call to ib_dereg_mr() directly.
> +		 */
> +		INIT_DELAYED_WORK(&ibmr->work, rds_ib_odp_mr_worker);
> +		queue_delayed_work(rds_ib_mr_wq, &ibmr->work, 0);
> +		return;
> +	}
> +
>  	/* Return it to the pool's free list */
>  	if (rds_ibdev->use_fastreg)
>  		rds_ib_free_frmr_list(ibmr);
> @@ -526,9 +545,17 @@ void rds_ib_flush_mrs(void)
>  	up_read(&rds_ib_devices_lock);
>  }
>
> +u32 rds_ib_get_lkey(void *trans_private)
> +{
> +	struct rds_ib_mr *ibmr = trans_private;
> +
> +	return ibmr->u.mr->lkey;
> +}
> +
>  void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
>  		    struct rds_sock *rs, u32 *key_ret,
> -		    struct rds_connection *conn)
> +		    struct rds_connection *conn,
> +		    u64 start, u64 length, int need_odp)
>  {
>  	struct rds_ib_device *rds_ibdev;
>  	struct rds_ib_mr *ibmr = NULL;
> @@ -541,6 +568,42 @@ void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
>  		goto out;
>  	}
>
> +	if (need_odp == ODP_ZEROBASED || need_odp == ODP_VIRTUAL) {
> +		u64 virt_addr = need_odp == ODP_ZEROBASED ? 0 : start;
> +		int access_flags =
> +			(IB_ACCESS_LOCAL_WRITE | IB_ACCESS_REMOTE_READ |
> +			 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_ATOMIC |
> +			 IB_ACCESS_ON_DEMAND);
> +		struct ib_mr *ib_mr;
> +
> +		if (!rds_ibdev->odp_capable) {
> +			ret = -EOPNOTSUPP;
> +			goto out;
> +		}
> +
> +		ib_mr = ib_reg_user_mr(rds_ibdev->pd, start, length, virt_addr,
> +				       access_flags);
> +
> +		if (IS_ERR(ib_mr)) {
> +			rdsdebug("rds_ib_get_user_mr returned %d\n",
> +				 IS_ERR(ib_mr));
> +			ret = PTR_ERR(ib_mr);
> +			goto out;
> +		}
> +		if (key_ret)
> +			*key_ret = ib_mr->rkey;
> +
> +		ibmr = kzalloc(sizeof(*ibmr), GFP_KERNEL);
> +		if (!ibmr) {
> +			ib_dereg_mr(ib_mr);
> +			ret = -ENOMEM;
> +			goto out;
> +		}
> +		ibmr->u.mr = ib_mr;
> +		ibmr->odp = 1;
> +		return ibmr;
> +	}
> +
>  	if (conn)
>  		ic = conn->c_transport_data;
>
> @@ -629,3 +692,12 @@ void rds_ib_mr_exit(void)
>  {
>  	destroy_workqueue(rds_ib_mr_wq);
>  }
> +
> +static void rds_ib_odp_mr_worker(struct work_struct  *work)
> +{
> +	struct rds_ib_mr *ibmr;
> +
> +	ibmr = container_of(work, struct rds_ib_mr, work.work);
> +	ib_dereg_mr(ibmr->u.mr);
> +	kfree(ibmr);
> +}
> diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
> index d1cc1d7778d8..dfe778220657 100644
> --- a/net/rds/ib_send.c
> +++ b/net/rds/ib_send.c
> @@ -39,6 +39,7 @@
>  #include "rds_single_path.h"
>  #include "rds.h"
>  #include "ib.h"
> +#include "ib_mr.h"
>
>  /*
>   * Convert IB-specific error message to RDS error message and call core
> @@ -635,6 +636,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
>  		send->s_sge[0].addr = ic->i_send_hdrs_dma[pos];
>
>  		send->s_sge[0].length = sizeof(struct rds_header);
> +		send->s_sge[0].lkey = ic->i_pd->local_dma_lkey;
>
>  		memcpy(ic->i_send_hdrs[pos], &rm->m_inc.i_hdr,
>  		       sizeof(struct rds_header));
> @@ -650,6 +652,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
>  			send->s_sge[1].addr = sg_dma_address(scat);
>  			send->s_sge[1].addr += rm->data.op_dmaoff;
>  			send->s_sge[1].length = len;
> +			send->s_sge[1].lkey = ic->i_pd->local_dma_lkey;
>
>  			bytes_sent += len;
>  			rm->data.op_dmaoff += len;
> @@ -858,20 +861,29 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
>  	int ret;
>  	int num_sge;
>  	int nr_sig = 0;
> +	u64 odp_addr = op->op_odp_addr;
> +	u32 odp_lkey = 0;
>
>  	/* map the op the first time we see it */
> -	if (!op->op_mapped) {
> -		op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
> -					     op->op_sg, op->op_nents, (op->op_write) ?
> -					     DMA_TO_DEVICE : DMA_FROM_DEVICE);
> -		rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count);
> -		if (op->op_count == 0) {
> -			rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
> -			ret = -ENOMEM; /* XXX ? */
> -			goto out;
> +	if (!op->op_odp_mr) {
> +		if (!op->op_mapped) {
> +			op->op_count =
> +				ib_dma_map_sg(ic->i_cm_id->device, op->op_sg,
> +					      op->op_nents,
> +					      (op->op_write) ? DMA_TO_DEVICE :
> +							       DMA_FROM_DEVICE);
> +			rdsdebug("ic %p mapping op %p: %d\n", ic, op,
> +				 op->op_count);
> +			if (op->op_count == 0) {
> +				rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
> +				ret = -ENOMEM; /* XXX ? */
> +				goto out;
> +			}
> +			op->op_mapped = 1;
>  		}
> -
> -		op->op_mapped = 1;
> +	} else {
> +		op->op_count = op->op_nents;
> +		odp_lkey = rds_ib_get_lkey(op->op_odp_mr->r_trans_private);
>  	}
>
>  	/*
> @@ -923,14 +935,20 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
>  		for (j = 0; j < send->s_rdma_wr.wr.num_sge &&
>  		     scat != &op->op_sg[op->op_count]; j++) {
>  			len = sg_dma_len(scat);
> -			send->s_sge[j].addr = sg_dma_address(scat);
> +			if (!op->op_odp_mr) {
> +				send->s_sge[j].addr = sg_dma_address(scat);
> +				send->s_sge[j].lkey = ic->i_pd->local_dma_lkey;
> +			} else {
> +				send->s_sge[j].addr = odp_addr;
> +				send->s_sge[j].lkey = odp_lkey;
> +			}
>  			send->s_sge[j].length = len;
> -			send->s_sge[j].lkey = ic->i_pd->local_dma_lkey;
>
>  			sent += len;
>  			rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
>
>  			remote_addr += len;
> +			odp_addr += len;
>  			scat++;
>  		}
>
> diff --git a/net/rds/rdma.c b/net/rds/rdma.c
> index eb23c38ce2b3..3c6afdda709b 100644
> --- a/net/rds/rdma.c
> +++ b/net/rds/rdma.c
> @@ -177,13 +177,14 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
>  			  struct rds_conn_path *cp)
>  {
>  	struct rds_mr *mr = NULL, *found;
> +	struct scatterlist *sg = NULL;
>  	unsigned int nr_pages;
>  	struct page **pages = NULL;
> -	struct scatterlist *sg;
>  	void *trans_private;
>  	unsigned long flags;
>  	rds_rdma_cookie_t cookie;
> -	unsigned int nents;
> +	unsigned int nents = 0;
> +	int need_odp = 0;
>  	long i;
>  	int ret;
>
> @@ -196,6 +197,20 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
>  		ret = -EOPNOTSUPP;
>  		goto out;
>  	}
> +	/* If the combination of the addr and size requested for this memory
> +	 * region causes an integer overflow, return error.
> +	 */
> +	if (((args->vec.addr + args->vec.bytes) < args->vec.addr) ||
> +	    PAGE_ALIGN(args->vec.addr + args->vec.bytes) <
> +		    (args->vec.addr + args->vec.bytes)) {
> +		ret = -EINVAL;
> +		goto out;
> +	}
> +
> +	if (!can_do_mlock()) {
> +		ret = -EPERM;
> +		goto out;
> +	}
>
>  	nr_pages = rds_pages_in_vec(&args->vec);
>  	if (nr_pages == 0) {
> @@ -250,36 +265,44 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
>  	 * the zero page.
>  	 */
>  	ret = rds_pin_pages(args->vec.addr, nr_pages, pages, 1);
> -	if (ret < 0)
> +	if (ret == -EOPNOTSUPP) {
> +		need_odp = 1;
> +	} else if (ret <= 0) {
>  		goto out;
> +	} else {
> +		nents = ret;
> +		sg = kcalloc(nents, sizeof(*sg), GFP_KERNEL);
> +		if (!sg) {
> +			ret = -ENOMEM;
> +			goto out;
> +		}
> +		WARN_ON(!nents);
> +		sg_init_table(sg, nents);
>
> -	nents = ret;
> -	sg = kcalloc(nents, sizeof(*sg), GFP_KERNEL);
> -	if (!sg) {
> -		ret = -ENOMEM;
> -		goto out;
> -	}
> -	WARN_ON(!nents);
> -	sg_init_table(sg, nents);
> -
> -	/* Stick all pages into the scatterlist */
> -	for (i = 0 ; i < nents; i++)
> -		sg_set_page(&sg[i], pages[i], PAGE_SIZE, 0);
> -
> -	rdsdebug("RDS: trans_private nents is %u\n", nents);
> +		/* Stick all pages into the scatterlist */
> +		for (i = 0 ; i < nents; i++)
> +			sg_set_page(&sg[i], pages[i], PAGE_SIZE, 0);
>
> +		rdsdebug("RDS: trans_private nents is %u\n", nents);
> +	}
>  	/* Obtain a transport specific MR. If this succeeds, the
>  	 * s/g list is now owned by the MR.
>  	 * Note that dma_map() implies that pending writes are
>  	 * flushed to RAM, so no dma_sync is needed here. */
> -	trans_private = rs->rs_transport->get_mr(sg, nents, rs,
> -						 &mr->r_key,
> -						 cp ? cp->cp_conn : NULL);
> +	trans_private = rs->rs_transport->get_mr(
> +		sg, nents, rs, &mr->r_key, cp ? cp->cp_conn : NULL,
> +		args->vec.addr, args->vec.bytes,
> +		need_odp ? ODP_ZEROBASED : ODP_NOT_NEEDED);
>
>  	if (IS_ERR(trans_private)) {
> -		for (i = 0 ; i < nents; i++)
> -			put_page(sg_page(&sg[i]));
> -		kfree(sg);
> +		/* In ODP case, we don't GUP pages, so don't need
> +		 * to release anything.
> +		 */
> +		if (!need_odp) {
> +			for (i = 0 ; i < nents; i++)
> +				put_page(sg_page(&sg[i]));
> +			kfree(sg);
> +		}
>  		ret = PTR_ERR(trans_private);
>  		goto out;
>  	}
> @@ -293,7 +316,11 @@ static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
>  	 * map page aligned regions. So we keep the offset, and build
>  	 * a 64bit cookie containing <R_Key, offset> and pass that
>  	 * around. */
> -	cookie = rds_rdma_make_cookie(mr->r_key, args->vec.addr & ~PAGE_MASK);
> +	if (need_odp)
> +		cookie = rds_rdma_make_cookie(mr->r_key, 0);
> +	else
> +		cookie = rds_rdma_make_cookie(mr->r_key,
> +					      args->vec.addr & ~PAGE_MASK);
>  	if (cookie_ret)
>  		*cookie_ret = cookie;
>
> @@ -458,22 +485,26 @@ void rds_rdma_free_op(struct rm_rdma_op *ro)
>  {
>  	unsigned int i;
>
> -	for (i = 0; i < ro->op_nents; i++) {
> -		struct page *page = sg_page(&ro->op_sg[i]);
> -
> -		/* Mark page dirty if it was possibly modified, which
> -		 * is the case for a RDMA_READ which copies from remote
> -		 * to local memory */
> -		if (!ro->op_write) {
> -			WARN_ON(!page->mapping && irqs_disabled());
> -			set_page_dirty(page);
> +	if (ro->op_odp_mr) {
> +		rds_mr_put(ro->op_odp_mr);
> +	} else {
> +		for (i = 0; i < ro->op_nents; i++) {
> +			struct page *page = sg_page(&ro->op_sg[i]);
> +
> +			/* Mark page dirty if it was possibly modified, which
> +			 * is the case for a RDMA_READ which copies from remote
> +			 * to local memory
> +			 */
> +			if (!ro->op_write)
> +				set_page_dirty(page);
> +			put_page(page);
>  		}
> -		put_page(page);
>  	}
>
>  	kfree(ro->op_notifier);
>  	ro->op_notifier = NULL;
>  	ro->op_active = 0;
> +	ro->op_odp_mr = NULL;
>  }
>
>  void rds_atomic_free_op(struct rm_atomic_op *ao)
> @@ -583,6 +614,7 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
>  	struct rds_iovec *iovs;
>  	unsigned int i, j;
>  	int ret = 0;
> +	bool odp_supported = true;
>
>  	if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_rdma_args))
>  	    || rm->rdma.op_active)
> @@ -604,6 +636,9 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
>  		ret = -EINVAL;
>  		goto out_ret;
>  	}
> +	/* odp-mr is not supported for multiple requests within one message */
> +	if (args->nr_local != 1)
> +		odp_supported = false;
>
>  	iovs = vec->iov;
>
> @@ -625,6 +660,8 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
>  	op->op_silent = !!(args->flags & RDS_RDMA_SILENT);
>  	op->op_active = 1;
>  	op->op_recverr = rs->rs_recverr;
> +	op->op_odp_mr = NULL;
> +
>  	WARN_ON(!nr_pages);
>  	op->op_sg = rds_message_alloc_sgs(rm, nr_pages, &ret);
>  	if (!op->op_sg)
> @@ -674,10 +711,44 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
>  		 * If it's a READ operation, we need to pin the pages for writing.
>  		 */
>  		ret = rds_pin_pages(iov->addr, nr, pages, !op->op_write);
> -		if (ret < 0)
> +		if ((!odp_supported && ret <= 0) ||
> +		    (odp_supported && ret <= 0 && ret != -EOPNOTSUPP))
>  			goto out_pages;
> -		else
> -			ret = 0;
> +
> +		if (ret == -EOPNOTSUPP) {
> +			struct rds_mr *local_odp_mr;
> +
> +			if (!rs->rs_transport->get_mr) {
> +				ret = -EOPNOTSUPP;
> +				goto out_pages;
> +			}
> +			local_odp_mr =
> +				kzalloc(sizeof(*local_odp_mr), GFP_KERNEL);
> +			if (!local_odp_mr) {
> +				ret = -ENOMEM;
> +				goto out_pages;
> +			}
> +			RB_CLEAR_NODE(&local_odp_mr->r_rb_node);
> +			refcount_set(&local_odp_mr->r_refcount, 1);
> +			local_odp_mr->r_trans = rs->rs_transport;
> +			local_odp_mr->r_sock = rs;
> +			local_odp_mr->r_trans_private =
> +				rs->rs_transport->get_mr(
> +					NULL, 0, rs, &local_odp_mr->r_key, NULL,
> +					iov->addr, iov->bytes, ODP_VIRTUAL);
> +			if (IS_ERR(local_odp_mr->r_trans_private)) {
> +				ret = IS_ERR(local_odp_mr->r_trans_private);
> +				rdsdebug("get_mr ret %d %p\"", ret,
> +					 local_odp_mr->r_trans_private);
> +				kfree(local_odp_mr);
> +				ret = -EOPNOTSUPP;
> +				goto out_pages;
> +			}
> +			rdsdebug("Need odp; local_odp_mr %p trans_private %p\n",
> +				 local_odp_mr, local_odp_mr->r_trans_private);
> +			op->op_odp_mr = local_odp_mr;
> +			op->op_odp_addr = iov->addr;
> +		}
>
>  		rdsdebug("RDS: nr_bytes %u nr %u iov->bytes %llu iov->addr %llx\n",
>  			 nr_bytes, nr, iov->bytes, iov->addr);
> @@ -693,6 +764,7 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
>  					min_t(unsigned int, iov->bytes, PAGE_SIZE - offset),
>  					offset);
>
> +			sg->dma_length = sg->length;

This line should be "sg_dma_len(sg) = sg->length;".

Thanks
diff mbox series

Patch

diff --git a/net/rds/ib.c b/net/rds/ib.c
index 3fd5f40189bd..a792d8a3872a 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -156,6 +156,13 @@  static void rds_ib_add_one(struct ib_device *device)
 	has_fmr = (device->ops.alloc_fmr && device->ops.dealloc_fmr &&
 		   device->ops.map_phys_fmr && device->ops.unmap_fmr);
 	rds_ibdev->use_fastreg = (has_fr && !has_fmr);
+	rds_ibdev->odp_capable =
+		!!(device->attrs.device_cap_flags &
+		   IB_DEVICE_ON_DEMAND_PAGING) &&
+		!!(device->attrs.odp_caps.per_transport_caps.rc_odp_caps &
+		   IB_ODP_SUPPORT_WRITE) &&
+		!!(device->attrs.odp_caps.per_transport_caps.rc_odp_caps &
+		   IB_ODP_SUPPORT_READ);

 	rds_ibdev->fmr_max_remaps = device->attrs.max_map_per_fmr?: 32;
 	rds_ibdev->max_1m_mrs = device->attrs.max_mr ?
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 6e6f24753998..0296f1f7acda 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -247,7 +247,8 @@  struct rds_ib_device {
 	struct ib_device	*dev;
 	struct ib_pd		*pd;
 	struct dma_pool		*rid_hdrs_pool; /* RDS headers DMA pool */
-	bool                    use_fastreg;
+	u8			use_fastreg:1;
+	u8			odp_capable:1;

 	unsigned int		max_mrs;
 	struct rds_ib_mr_pool	*mr_1m_pool;
diff --git a/net/rds/ib_mr.h b/net/rds/ib_mr.h
index 9045a8c0edff..0c8252d7fe2b 100644
--- a/net/rds/ib_mr.h
+++ b/net/rds/ib_mr.h
@@ -67,6 +67,7 @@  struct rds_ib_frmr {

 /* This is stored as mr->r_trans_private. */
 struct rds_ib_mr {
+	struct delayed_work		work;
 	struct rds_ib_device		*device;
 	struct rds_ib_mr_pool		*pool;
 	struct rds_ib_connection	*ic;
@@ -81,9 +82,11 @@  struct rds_ib_mr {
 	unsigned int			sg_len;
 	int				sg_dma_len;

+	u8				odp:1;
 	union {
 		struct rds_ib_fmr	fmr;
 		struct rds_ib_frmr	frmr;
+		struct ib_mr		*mr;
 	} u;
 };

@@ -122,12 +125,14 @@  void rds6_ib_get_mr_info(struct rds_ib_device *rds_ibdev,
 void rds_ib_destroy_mr_pool(struct rds_ib_mr_pool *);
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 		    struct rds_sock *rs, u32 *key_ret,
-		    struct rds_connection *conn);
+		    struct rds_connection *conn, u64 start, u64 length,
+		    int need_odp);
 void rds_ib_sync_mr(void *trans_private, int dir);
 void rds_ib_free_mr(void *trans_private, int invalidate);
 void rds_ib_flush_mrs(void);
 int rds_ib_mr_init(void);
 void rds_ib_mr_exit(void);
+u32 rds_ib_get_lkey(void *trans_private);

 void __rds_ib_teardown_mr(struct rds_ib_mr *);
 void rds_ib_teardown_mr(struct rds_ib_mr *);
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index c8c1e3ae8d84..5a02b313ec50 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -37,8 +37,15 @@ 

 #include "rds_single_path.h"
 #include "ib_mr.h"
+#include "rds.h"

 struct workqueue_struct *rds_ib_mr_wq;
+struct rds_ib_dereg_odp_mr {
+	struct work_struct work;
+	struct ib_mr *mr;
+};
+
+static void rds_ib_odp_mr_worker(struct work_struct *work);

 static struct rds_ib_device *rds_ib_get_device(__be32 ipaddr)
 {
@@ -213,6 +220,8 @@  void rds_ib_sync_mr(void *trans_private, int direction)
 	struct rds_ib_mr *ibmr = trans_private;
 	struct rds_ib_device *rds_ibdev = ibmr->device;

+	if (ibmr->odp)
+		return;
 	switch (direction) {
 	case DMA_FROM_DEVICE:
 		ib_dma_sync_sg_for_cpu(rds_ibdev->dev, ibmr->sg,
@@ -482,6 +491,16 @@  void rds_ib_free_mr(void *trans_private, int invalidate)

 	rdsdebug("RDS/IB: free_mr nents %u\n", ibmr->sg_len);

+	if (ibmr->odp) {
+		/* A MR created and marked as use_once. We use delayed work,
+		 * because there is a change that we are in interrupt and can't
+		 * call to ib_dereg_mr() directly.
+		 */
+		INIT_DELAYED_WORK(&ibmr->work, rds_ib_odp_mr_worker);
+		queue_delayed_work(rds_ib_mr_wq, &ibmr->work, 0);
+		return;
+	}
+
 	/* Return it to the pool's free list */
 	if (rds_ibdev->use_fastreg)
 		rds_ib_free_frmr_list(ibmr);
@@ -526,9 +545,17 @@  void rds_ib_flush_mrs(void)
 	up_read(&rds_ib_devices_lock);
 }

+u32 rds_ib_get_lkey(void *trans_private)
+{
+	struct rds_ib_mr *ibmr = trans_private;
+
+	return ibmr->u.mr->lkey;
+}
+
 void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 		    struct rds_sock *rs, u32 *key_ret,
-		    struct rds_connection *conn)
+		    struct rds_connection *conn,
+		    u64 start, u64 length, int need_odp)
 {
 	struct rds_ib_device *rds_ibdev;
 	struct rds_ib_mr *ibmr = NULL;
@@ -541,6 +568,42 @@  void *rds_ib_get_mr(struct scatterlist *sg, unsigned long nents,
 		goto out;
 	}

+	if (need_odp == ODP_ZEROBASED || need_odp == ODP_VIRTUAL) {
+		u64 virt_addr = need_odp == ODP_ZEROBASED ? 0 : start;
+		int access_flags =
+			(IB_ACCESS_LOCAL_WRITE | IB_ACCESS_REMOTE_READ |
+			 IB_ACCESS_REMOTE_WRITE | IB_ACCESS_REMOTE_ATOMIC |
+			 IB_ACCESS_ON_DEMAND);
+		struct ib_mr *ib_mr;
+
+		if (!rds_ibdev->odp_capable) {
+			ret = -EOPNOTSUPP;
+			goto out;
+		}
+
+		ib_mr = ib_reg_user_mr(rds_ibdev->pd, start, length, virt_addr,
+				       access_flags);
+
+		if (IS_ERR(ib_mr)) {
+			rdsdebug("rds_ib_get_user_mr returned %d\n",
+				 IS_ERR(ib_mr));
+			ret = PTR_ERR(ib_mr);
+			goto out;
+		}
+		if (key_ret)
+			*key_ret = ib_mr->rkey;
+
+		ibmr = kzalloc(sizeof(*ibmr), GFP_KERNEL);
+		if (!ibmr) {
+			ib_dereg_mr(ib_mr);
+			ret = -ENOMEM;
+			goto out;
+		}
+		ibmr->u.mr = ib_mr;
+		ibmr->odp = 1;
+		return ibmr;
+	}
+
 	if (conn)
 		ic = conn->c_transport_data;

@@ -629,3 +692,12 @@  void rds_ib_mr_exit(void)
 {
 	destroy_workqueue(rds_ib_mr_wq);
 }
+
+static void rds_ib_odp_mr_worker(struct work_struct  *work)
+{
+	struct rds_ib_mr *ibmr;
+
+	ibmr = container_of(work, struct rds_ib_mr, work.work);
+	ib_dereg_mr(ibmr->u.mr);
+	kfree(ibmr);
+}
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index d1cc1d7778d8..dfe778220657 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -39,6 +39,7 @@ 
 #include "rds_single_path.h"
 #include "rds.h"
 #include "ib.h"
+#include "ib_mr.h"

 /*
  * Convert IB-specific error message to RDS error message and call core
@@ -635,6 +636,7 @@  int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 		send->s_sge[0].addr = ic->i_send_hdrs_dma[pos];

 		send->s_sge[0].length = sizeof(struct rds_header);
+		send->s_sge[0].lkey = ic->i_pd->local_dma_lkey;

 		memcpy(ic->i_send_hdrs[pos], &rm->m_inc.i_hdr,
 		       sizeof(struct rds_header));
@@ -650,6 +652,7 @@  int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
 			send->s_sge[1].addr = sg_dma_address(scat);
 			send->s_sge[1].addr += rm->data.op_dmaoff;
 			send->s_sge[1].length = len;
+			send->s_sge[1].lkey = ic->i_pd->local_dma_lkey;

 			bytes_sent += len;
 			rm->data.op_dmaoff += len;
@@ -858,20 +861,29 @@  int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
 	int ret;
 	int num_sge;
 	int nr_sig = 0;
+	u64 odp_addr = op->op_odp_addr;
+	u32 odp_lkey = 0;

 	/* map the op the first time we see it */
-	if (!op->op_mapped) {
-		op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
-					     op->op_sg, op->op_nents, (op->op_write) ?
-					     DMA_TO_DEVICE : DMA_FROM_DEVICE);
-		rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count);
-		if (op->op_count == 0) {
-			rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
-			ret = -ENOMEM; /* XXX ? */
-			goto out;
+	if (!op->op_odp_mr) {
+		if (!op->op_mapped) {
+			op->op_count =
+				ib_dma_map_sg(ic->i_cm_id->device, op->op_sg,
+					      op->op_nents,
+					      (op->op_write) ? DMA_TO_DEVICE :
+							       DMA_FROM_DEVICE);
+			rdsdebug("ic %p mapping op %p: %d\n", ic, op,
+				 op->op_count);
+			if (op->op_count == 0) {
+				rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
+				ret = -ENOMEM; /* XXX ? */
+				goto out;
+			}
+			op->op_mapped = 1;
 		}
-
-		op->op_mapped = 1;
+	} else {
+		op->op_count = op->op_nents;
+		odp_lkey = rds_ib_get_lkey(op->op_odp_mr->r_trans_private);
 	}

 	/*
@@ -923,14 +935,20 @@  int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
 		for (j = 0; j < send->s_rdma_wr.wr.num_sge &&
 		     scat != &op->op_sg[op->op_count]; j++) {
 			len = sg_dma_len(scat);
-			send->s_sge[j].addr = sg_dma_address(scat);
+			if (!op->op_odp_mr) {
+				send->s_sge[j].addr = sg_dma_address(scat);
+				send->s_sge[j].lkey = ic->i_pd->local_dma_lkey;
+			} else {
+				send->s_sge[j].addr = odp_addr;
+				send->s_sge[j].lkey = odp_lkey;
+			}
 			send->s_sge[j].length = len;
-			send->s_sge[j].lkey = ic->i_pd->local_dma_lkey;

 			sent += len;
 			rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);

 			remote_addr += len;
+			odp_addr += len;
 			scat++;
 		}

diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index eb23c38ce2b3..3c6afdda709b 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -177,13 +177,14 @@  static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 			  struct rds_conn_path *cp)
 {
 	struct rds_mr *mr = NULL, *found;
+	struct scatterlist *sg = NULL;
 	unsigned int nr_pages;
 	struct page **pages = NULL;
-	struct scatterlist *sg;
 	void *trans_private;
 	unsigned long flags;
 	rds_rdma_cookie_t cookie;
-	unsigned int nents;
+	unsigned int nents = 0;
+	int need_odp = 0;
 	long i;
 	int ret;

@@ -196,6 +197,20 @@  static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 		ret = -EOPNOTSUPP;
 		goto out;
 	}
+	/* If the combination of the addr and size requested for this memory
+	 * region causes an integer overflow, return error.
+	 */
+	if (((args->vec.addr + args->vec.bytes) < args->vec.addr) ||
+	    PAGE_ALIGN(args->vec.addr + args->vec.bytes) <
+		    (args->vec.addr + args->vec.bytes)) {
+		ret = -EINVAL;
+		goto out;
+	}
+
+	if (!can_do_mlock()) {
+		ret = -EPERM;
+		goto out;
+	}

 	nr_pages = rds_pages_in_vec(&args->vec);
 	if (nr_pages == 0) {
@@ -250,36 +265,44 @@  static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 	 * the zero page.
 	 */
 	ret = rds_pin_pages(args->vec.addr, nr_pages, pages, 1);
-	if (ret < 0)
+	if (ret == -EOPNOTSUPP) {
+		need_odp = 1;
+	} else if (ret <= 0) {
 		goto out;
+	} else {
+		nents = ret;
+		sg = kcalloc(nents, sizeof(*sg), GFP_KERNEL);
+		if (!sg) {
+			ret = -ENOMEM;
+			goto out;
+		}
+		WARN_ON(!nents);
+		sg_init_table(sg, nents);

-	nents = ret;
-	sg = kcalloc(nents, sizeof(*sg), GFP_KERNEL);
-	if (!sg) {
-		ret = -ENOMEM;
-		goto out;
-	}
-	WARN_ON(!nents);
-	sg_init_table(sg, nents);
-
-	/* Stick all pages into the scatterlist */
-	for (i = 0 ; i < nents; i++)
-		sg_set_page(&sg[i], pages[i], PAGE_SIZE, 0);
-
-	rdsdebug("RDS: trans_private nents is %u\n", nents);
+		/* Stick all pages into the scatterlist */
+		for (i = 0 ; i < nents; i++)
+			sg_set_page(&sg[i], pages[i], PAGE_SIZE, 0);

+		rdsdebug("RDS: trans_private nents is %u\n", nents);
+	}
 	/* Obtain a transport specific MR. If this succeeds, the
 	 * s/g list is now owned by the MR.
 	 * Note that dma_map() implies that pending writes are
 	 * flushed to RAM, so no dma_sync is needed here. */
-	trans_private = rs->rs_transport->get_mr(sg, nents, rs,
-						 &mr->r_key,
-						 cp ? cp->cp_conn : NULL);
+	trans_private = rs->rs_transport->get_mr(
+		sg, nents, rs, &mr->r_key, cp ? cp->cp_conn : NULL,
+		args->vec.addr, args->vec.bytes,
+		need_odp ? ODP_ZEROBASED : ODP_NOT_NEEDED);

 	if (IS_ERR(trans_private)) {
-		for (i = 0 ; i < nents; i++)
-			put_page(sg_page(&sg[i]));
-		kfree(sg);
+		/* In ODP case, we don't GUP pages, so don't need
+		 * to release anything.
+		 */
+		if (!need_odp) {
+			for (i = 0 ; i < nents; i++)
+				put_page(sg_page(&sg[i]));
+			kfree(sg);
+		}
 		ret = PTR_ERR(trans_private);
 		goto out;
 	}
@@ -293,7 +316,11 @@  static int __rds_rdma_map(struct rds_sock *rs, struct rds_get_mr_args *args,
 	 * map page aligned regions. So we keep the offset, and build
 	 * a 64bit cookie containing <R_Key, offset> and pass that
 	 * around. */
-	cookie = rds_rdma_make_cookie(mr->r_key, args->vec.addr & ~PAGE_MASK);
+	if (need_odp)
+		cookie = rds_rdma_make_cookie(mr->r_key, 0);
+	else
+		cookie = rds_rdma_make_cookie(mr->r_key,
+					      args->vec.addr & ~PAGE_MASK);
 	if (cookie_ret)
 		*cookie_ret = cookie;

@@ -458,22 +485,26 @@  void rds_rdma_free_op(struct rm_rdma_op *ro)
 {
 	unsigned int i;

-	for (i = 0; i < ro->op_nents; i++) {
-		struct page *page = sg_page(&ro->op_sg[i]);
-
-		/* Mark page dirty if it was possibly modified, which
-		 * is the case for a RDMA_READ which copies from remote
-		 * to local memory */
-		if (!ro->op_write) {
-			WARN_ON(!page->mapping && irqs_disabled());
-			set_page_dirty(page);
+	if (ro->op_odp_mr) {
+		rds_mr_put(ro->op_odp_mr);
+	} else {
+		for (i = 0; i < ro->op_nents; i++) {
+			struct page *page = sg_page(&ro->op_sg[i]);
+
+			/* Mark page dirty if it was possibly modified, which
+			 * is the case for a RDMA_READ which copies from remote
+			 * to local memory
+			 */
+			if (!ro->op_write)
+				set_page_dirty(page);
+			put_page(page);
 		}
-		put_page(page);
 	}

 	kfree(ro->op_notifier);
 	ro->op_notifier = NULL;
 	ro->op_active = 0;
+	ro->op_odp_mr = NULL;
 }

 void rds_atomic_free_op(struct rm_atomic_op *ao)
@@ -583,6 +614,7 @@  int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
 	struct rds_iovec *iovs;
 	unsigned int i, j;
 	int ret = 0;
+	bool odp_supported = true;

 	if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_rdma_args))
 	    || rm->rdma.op_active)
@@ -604,6 +636,9 @@  int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
 		ret = -EINVAL;
 		goto out_ret;
 	}
+	/* odp-mr is not supported for multiple requests within one message */
+	if (args->nr_local != 1)
+		odp_supported = false;

 	iovs = vec->iov;

@@ -625,6 +660,8 @@  int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
 	op->op_silent = !!(args->flags & RDS_RDMA_SILENT);
 	op->op_active = 1;
 	op->op_recverr = rs->rs_recverr;
+	op->op_odp_mr = NULL;
+
 	WARN_ON(!nr_pages);
 	op->op_sg = rds_message_alloc_sgs(rm, nr_pages, &ret);
 	if (!op->op_sg)
@@ -674,10 +711,44 @@  int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
 		 * If it's a READ operation, we need to pin the pages for writing.
 		 */
 		ret = rds_pin_pages(iov->addr, nr, pages, !op->op_write);
-		if (ret < 0)
+		if ((!odp_supported && ret <= 0) ||
+		    (odp_supported && ret <= 0 && ret != -EOPNOTSUPP))
 			goto out_pages;
-		else
-			ret = 0;
+
+		if (ret == -EOPNOTSUPP) {
+			struct rds_mr *local_odp_mr;
+
+			if (!rs->rs_transport->get_mr) {
+				ret = -EOPNOTSUPP;
+				goto out_pages;
+			}
+			local_odp_mr =
+				kzalloc(sizeof(*local_odp_mr), GFP_KERNEL);
+			if (!local_odp_mr) {
+				ret = -ENOMEM;
+				goto out_pages;
+			}
+			RB_CLEAR_NODE(&local_odp_mr->r_rb_node);
+			refcount_set(&local_odp_mr->r_refcount, 1);
+			local_odp_mr->r_trans = rs->rs_transport;
+			local_odp_mr->r_sock = rs;
+			local_odp_mr->r_trans_private =
+				rs->rs_transport->get_mr(
+					NULL, 0, rs, &local_odp_mr->r_key, NULL,
+					iov->addr, iov->bytes, ODP_VIRTUAL);
+			if (IS_ERR(local_odp_mr->r_trans_private)) {
+				ret = IS_ERR(local_odp_mr->r_trans_private);
+				rdsdebug("get_mr ret %d %p\"", ret,
+					 local_odp_mr->r_trans_private);
+				kfree(local_odp_mr);
+				ret = -EOPNOTSUPP;
+				goto out_pages;
+			}
+			rdsdebug("Need odp; local_odp_mr %p trans_private %p\n",
+				 local_odp_mr, local_odp_mr->r_trans_private);
+			op->op_odp_mr = local_odp_mr;
+			op->op_odp_addr = iov->addr;
+		}

 		rdsdebug("RDS: nr_bytes %u nr %u iov->bytes %llu iov->addr %llx\n",
 			 nr_bytes, nr, iov->bytes, iov->addr);
@@ -693,6 +764,7 @@  int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
 					min_t(unsigned int, iov->bytes, PAGE_SIZE - offset),
 					offset);

+			sg->dma_length = sg->length;
 			rdsdebug("RDS: sg->offset %x sg->len %x iov->addr %llx iov->bytes %llu\n",
 			       sg->offset, sg->length, iov->addr, iov->bytes);

@@ -711,6 +783,7 @@  int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
 		goto out_pages;
 	}
 	op->op_bytes = nr_bytes;
+	ret = 0;

 out_pages:
 	kfree(pages);
@@ -757,7 +830,8 @@  int rds_cmsg_rdma_dest(struct rds_sock *rs, struct rds_message *rm,
 	spin_unlock_irqrestore(&rs->rs_rdma_lock, flags);

 	if (mr) {
-		mr->r_trans->sync_mr(mr->r_trans_private, DMA_TO_DEVICE);
+		mr->r_trans->sync_mr(mr->r_trans_private,
+				     DMA_TO_DEVICE);
 		rm->rdma.op_rdma_mr = mr;
 	}
 	return err;
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 53e86911773a..e4a603523083 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -40,7 +40,6 @@ 
 #ifdef ATOMIC64_INIT
 #define KERNEL_HAS_ATOMIC64
 #endif
-
 #ifdef RDS_DEBUG
 #define rdsdebug(fmt, args...) pr_debug("%s(): " fmt, __func__ , ##args)
 #else
@@ -478,6 +477,9 @@  struct rds_message {
 			struct rds_notifier	*op_notifier;

 			struct rds_mr		*op_rdma_mr;
+
+			u64			op_odp_addr;
+			struct rds_mr		*op_odp_mr;
 		} rdma;
 		struct rm_data_op {
 			unsigned int		op_active:1;
@@ -573,7 +575,8 @@  struct rds_transport {
 	void (*exit)(void);
 	void *(*get_mr)(struct scatterlist *sg, unsigned long nr_sg,
 			struct rds_sock *rs, u32 *key_ret,
-			struct rds_connection *conn);
+			struct rds_connection *conn,
+			u64 start, u64 length, int need_odp);
 	void (*sync_mr)(void *trans_private, int direction);
 	void (*free_mr)(void *trans_private, int invalidate);
 	void (*flush_mrs)(void);
@@ -956,6 +959,12 @@  static inline bool rds_destroy_pending(struct rds_connection *conn)
 	       (conn->c_trans->t_unloading && conn->c_trans->t_unloading(conn));
 }

+enum {
+	ODP_NOT_NEEDED,
+	ODP_ZEROBASED,
+	ODP_VIRTUAL
+};
+
 /* stats.c */
 DECLARE_PER_CPU_SHARED_ALIGNED(struct rds_statistics, rds_stats);
 #define rds_stats_inc_which(which, member) do {		\