diff mbox series

[for-next,v2,2/2] RDMA/rxe: Protect user space index loads/stores

Message ID 20210526045139.634978-3-rpearsonhpe@gmail.com (mailing list archive)
State Superseded
Headers show
Series Fix memory ordering errors in queues | expand

Commit Message

Bob Pearson May 26, 2021, 4:51 a.m. UTC
Modify the queue APIs to protect all user space index loads
with smp_load_acquire() and all user space index stores with
smp_store_release(). Base this on the types of the queues which
can be one of ..KERNEL, ..FROM_USER, ..TO_USER. Kernel space
indices are protected by locks which also provide memory barriers.

Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
---
v2:
  In v2 use queue type to selectively protect user space indices.
---
 drivers/infiniband/sw/rxe/rxe_queue.h | 168 ++++++++++++++++++--------
 1 file changed, 117 insertions(+), 51 deletions(-)

Comments

Zhu Yanjun May 26, 2021, 10:19 a.m. UTC | #1
On Wed, May 26, 2021 at 12:52 PM Bob Pearson <rpearsonhpe@gmail.com> wrote:
>
> Modify the queue APIs to protect all user space index loads
> with smp_load_acquire() and all user space index stores with
> smp_store_release(). Base this on the types of the queues which
> can be one of ..KERNEL, ..FROM_USER, ..TO_USER. Kernel space
> indices are protected by locks which also provide memory barriers.
>
> Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
> ---
> v2:
>   In v2 use queue type to selectively protect user space indices.
> ---
>  drivers/infiniband/sw/rxe/rxe_queue.h | 168 ++++++++++++++++++--------
>  1 file changed, 117 insertions(+), 51 deletions(-)
>
> diff --git a/drivers/infiniband/sw/rxe/rxe_queue.h b/drivers/infiniband/sw/rxe/rxe_queue.h
> index 4512745419f8..6e705e09d357 100644
> --- a/drivers/infiniband/sw/rxe/rxe_queue.h
> +++ b/drivers/infiniband/sw/rxe/rxe_queue.h
> @@ -66,12 +66,22 @@ static inline int queue_empty(struct rxe_queue *q)
>         u32 prod;
>         u32 cons;
>
> -       /* make sure all changes to queue complete before
> -        * testing queue empty
> -        */
> -       prod = smp_load_acquire(&q->buf->producer_index);
> -       /* same */
> -       cons = smp_load_acquire(&q->buf->consumer_index);
> +       switch (q->type) {
> +       case QUEUE_TYPE_FROM_USER:
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +               cons = q->buf->consumer_index;
> +               break;
> +       case QUEUE_TYPE_TO_USER:
> +               prod = q->buf->producer_index;
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +               break;
> +       case QUEUE_TYPE_KERNEL:
> +               prod = q->buf->producer_index;
> +               cons = q->buf->consumer_index;
> +               break;
> +       }
>
>         return ((prod - cons) & q->index_mask) == 0;
>  }
> @@ -81,95 +91,151 @@ static inline int queue_full(struct rxe_queue *q)
>         u32 prod;
>         u32 cons;
>
> -       /* make sure all changes to queue complete before
> -        * testing queue full
> -        */
> -       prod = smp_load_acquire(&q->buf->producer_index);
> -       /* same */
> -       cons = smp_load_acquire(&q->buf->consumer_index);
> +       switch (q->type) {
> +       case QUEUE_TYPE_FROM_USER:
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +               cons = q->buf->consumer_index;
> +               break;
> +       case QUEUE_TYPE_TO_USER:
> +               prod = q->buf->producer_index;
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +               break;
> +       case QUEUE_TYPE_KERNEL:
> +               prod = q->buf->producer_index;
> +               cons = q->buf->consumer_index;
> +               break;
> +       }
>
>         return ((prod + 1 - cons) & q->index_mask) == 0;
>  }
>
> -static inline void advance_producer(struct rxe_queue *q)
> +static inline unsigned int queue_count(const struct rxe_queue *q)
>  {
>         u32 prod;
> +       u32 cons;
>
> -       prod = (q->buf->producer_index + 1) & q->index_mask;
> +       switch (q->type) {
> +       case QUEUE_TYPE_FROM_USER:
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +               cons = q->buf->consumer_index;
> +               break;
> +       case QUEUE_TYPE_TO_USER:
> +               prod = q->buf->producer_index;
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +               break;
> +       case QUEUE_TYPE_KERNEL:
> +               prod = q->buf->producer_index;
> +               cons = q->buf->consumer_index;
> +               break;
> +       }

The above source code appears in the functions queue_count, queue_full
and queue_empty.
So is it possible to use a seperate function to implement it?

Thanks
Zhu Yanjun

> +
> +       return (prod - cons) & q->index_mask;
> +}
> +
> +static inline void advance_producer(struct rxe_queue *q)
> +{
> +       u32 prod;
>
> -       /* make sure all changes to queue complete before
> -        * changing producer index
> -        */
> -       smp_store_release(&q->buf->producer_index, prod);
> +       if (q->type == QUEUE_TYPE_FROM_USER) {
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +               prod = (prod + 1) & q->index_mask;
> +               /* same */
> +               smp_store_release(&q->buf->producer_index, prod);
> +       } else {
> +               prod = q->buf->producer_index;
> +               q->buf->producer_index = (prod + 1) & q->index_mask;
> +       }
>  }
>
>  static inline void advance_consumer(struct rxe_queue *q)
>  {
>         u32 cons;
>
> -       cons = (q->buf->consumer_index + 1) & q->index_mask;
> -
> -       /* make sure all changes to queue complete before
> -        * changing consumer index
> -        */
> -       smp_store_release(&q->buf->consumer_index, cons);
> +       if (q->type == QUEUE_TYPE_TO_USER) {
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +               cons = (cons + 1) & q->index_mask;
> +               /* same */
> +               smp_store_release(&q->buf->consumer_index, cons);
> +       } else {
> +               cons = q->buf->consumer_index;
> +               q->buf->consumer_index = (cons + 1) & q->index_mask;
> +       }
>  }
>
>  static inline void *producer_addr(struct rxe_queue *q)
>  {
> -       return q->buf->data + ((q->buf->producer_index & q->index_mask)
> -                               << q->log2_elem_size);
> +       u32 prod;
> +
> +       if (q->type == QUEUE_TYPE_FROM_USER)
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +       else
> +               prod = q->buf->producer_index;
> +
> +       return q->buf->data + ((prod & q->index_mask) << q->log2_elem_size);
>  }
>
>  static inline void *consumer_addr(struct rxe_queue *q)
>  {
> -       return q->buf->data + ((q->buf->consumer_index & q->index_mask)
> -                               << q->log2_elem_size);
> +       u32 cons;
> +
> +       if (q->type == QUEUE_TYPE_TO_USER)
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +       else
> +               cons = q->buf->consumer_index;
> +
> +       return q->buf->data + ((cons & q->index_mask) << q->log2_elem_size);
>  }
>
>  static inline unsigned int producer_index(struct rxe_queue *q)
>  {
> -       u32 index;
> +       u32 prod;
> +
> +       if (q->type == QUEUE_TYPE_FROM_USER)
> +               /* protect user space index */
> +               prod = smp_load_acquire(&q->buf->producer_index);
> +       else
> +               prod = q->buf->producer_index;
>
> -       /* make sure all changes to queue
> -        * complete before getting producer index
> -        */
> -       index = smp_load_acquire(&q->buf->producer_index);
> -       index &= q->index_mask;
> +       prod &= q->index_mask;
>
> -       return index;
> +       return prod;
>  }
>
>  static inline unsigned int consumer_index(struct rxe_queue *q)
>  {
> -       u32 index;
> +       u32 cons;
>
> -       /* make sure all changes to queue
> -        * complete before getting consumer index
> -        */
> -       index = smp_load_acquire(&q->buf->consumer_index);
> -       index &= q->index_mask;
> +       if (q->type == QUEUE_TYPE_TO_USER)
> +               /* protect user space index */
> +               cons = smp_load_acquire(&q->buf->consumer_index);
> +       else
> +               cons = q->buf->consumer_index;
>
> -       return index;
> +       cons &= q->index_mask;
> +
> +       return cons;
>  }
>
> -static inline void *addr_from_index(struct rxe_queue *q, unsigned int index)
> +static inline void *addr_from_index(struct rxe_queue *q,
> +                               unsigned int index)
>  {
>         return q->buf->data + ((index & q->index_mask)
>                                 << q->buf->log2_elem_size);
>  }
>
>  static inline unsigned int index_from_addr(const struct rxe_queue *q,
> -                                          const void *addr)
> +                               const void *addr)
>  {
>         return (((u8 *)addr - q->buf->data) >> q->log2_elem_size)
> -               & q->index_mask;
> -}
> -
> -static inline unsigned int queue_count(const struct rxe_queue *q)
> -{
> -       return (q->buf->producer_index - q->buf->consumer_index)
> -               & q->index_mask;
> +                               & q->index_mask;
>  }
>
>  static inline void *queue_head(struct rxe_queue *q)
> --
> 2.30.2
>
Bob Pearson May 26, 2021, 3:23 p.m. UTC | #2
On 5/26/2021 5:19 AM, Zhu Yanjun wrote:
> On Wed, May 26, 2021 at 12:52 PM Bob Pearson <rpearsonhpe@gmail.com> wrote:
>> Modify the queue APIs to protect all user space index loads
>> with smp_load_acquire() and all user space index stores with
>> smp_store_release(). Base this on the types of the queues which
>> can be one of ..KERNEL, ..FROM_USER, ..TO_USER. Kernel space
>> indices are protected by locks which also provide memory barriers.
>>
>> Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
>> ---
>> v2:
>>    In v2 use queue type to selectively protect user space indices.
>> ---
>>   drivers/infiniband/sw/rxe/rxe_queue.h | 168 ++++++++++++++++++--------
>>   1 file changed, 117 insertions(+), 51 deletions(-)
>>
>> diff --git a/drivers/infiniband/sw/rxe/rxe_queue.h b/drivers/infiniband/sw/rxe/rxe_queue.h
>> index 4512745419f8..6e705e09d357 100644
>> --- a/drivers/infiniband/sw/rxe/rxe_queue.h
>> +++ b/drivers/infiniband/sw/rxe/rxe_queue.h
>> @@ -66,12 +66,22 @@ static inline int queue_empty(struct rxe_queue *q)
>>          u32 prod;
>>          u32 cons;
>>
>> -       /* make sure all changes to queue complete before
>> -        * testing queue empty
>> -        */
>> -       prod = smp_load_acquire(&q->buf->producer_index);
>> -       /* same */
>> -       cons = smp_load_acquire(&q->buf->consumer_index);
>> +       switch (q->type) {
>> +       case QUEUE_TYPE_FROM_USER:
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       case QUEUE_TYPE_TO_USER:
>> +               prod = q->buf->producer_index;
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +               break;
>> +       case QUEUE_TYPE_KERNEL:
>> +               prod = q->buf->producer_index;
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       }
>>
>>          return ((prod - cons) & q->index_mask) == 0;
>>   }
>> @@ -81,95 +91,151 @@ static inline int queue_full(struct rxe_queue *q)
>>          u32 prod;
>>          u32 cons;
>>
>> -       /* make sure all changes to queue complete before
>> -        * testing queue full
>> -        */
>> -       prod = smp_load_acquire(&q->buf->producer_index);
>> -       /* same */
>> -       cons = smp_load_acquire(&q->buf->consumer_index);
>> +       switch (q->type) {
>> +       case QUEUE_TYPE_FROM_USER:
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       case QUEUE_TYPE_TO_USER:
>> +               prod = q->buf->producer_index;
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +               break;
>> +       case QUEUE_TYPE_KERNEL:
>> +               prod = q->buf->producer_index;
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       }
>>
>>          return ((prod + 1 - cons) & q->index_mask) == 0;
>>   }
>>
>> -static inline void advance_producer(struct rxe_queue *q)
>> +static inline unsigned int queue_count(const struct rxe_queue *q)
>>   {
>>          u32 prod;
>> +       u32 cons;
>>
>> -       prod = (q->buf->producer_index + 1) & q->index_mask;
>> +       switch (q->type) {
>> +       case QUEUE_TYPE_FROM_USER:
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       case QUEUE_TYPE_TO_USER:
>> +               prod = q->buf->producer_index;
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +               break;
>> +       case QUEUE_TYPE_KERNEL:
>> +               prod = q->buf->producer_index;
>> +               cons = q->buf->consumer_index;
>> +               break;
>> +       }
> The above source code appears in the functions queue_count, queue_full
> and queue_empty.
> So is it possible to use a seperate function to implement it?
>
> Thanks
> Zhu Yanjun

Maybe, but I was trying to keep this inline. It won't save anything at 
runtime.

Bob

>
>> +
>> +       return (prod - cons) & q->index_mask;
>> +}
>> +
>> +static inline void advance_producer(struct rxe_queue *q)
>> +{
>> +       u32 prod;
>>
>> -       /* make sure all changes to queue complete before
>> -        * changing producer index
>> -        */
>> -       smp_store_release(&q->buf->producer_index, prod);
>> +       if (q->type == QUEUE_TYPE_FROM_USER) {
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +               prod = (prod + 1) & q->index_mask;
>> +               /* same */
>> +               smp_store_release(&q->buf->producer_index, prod);
>> +       } else {
>> +               prod = q->buf->producer_index;
>> +               q->buf->producer_index = (prod + 1) & q->index_mask;
>> +       }
>>   }
>>
>>   static inline void advance_consumer(struct rxe_queue *q)
>>   {
>>          u32 cons;
>>
>> -       cons = (q->buf->consumer_index + 1) & q->index_mask;
>> -
>> -       /* make sure all changes to queue complete before
>> -        * changing consumer index
>> -        */
>> -       smp_store_release(&q->buf->consumer_index, cons);
>> +       if (q->type == QUEUE_TYPE_TO_USER) {
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +               cons = (cons + 1) & q->index_mask;
>> +               /* same */
>> +               smp_store_release(&q->buf->consumer_index, cons);
>> +       } else {
>> +               cons = q->buf->consumer_index;
>> +               q->buf->consumer_index = (cons + 1) & q->index_mask;
>> +       }
>>   }
>>
>>   static inline void *producer_addr(struct rxe_queue *q)
>>   {
>> -       return q->buf->data + ((q->buf->producer_index & q->index_mask)
>> -                               << q->log2_elem_size);
>> +       u32 prod;
>> +
>> +       if (q->type == QUEUE_TYPE_FROM_USER)
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +       else
>> +               prod = q->buf->producer_index;
>> +
>> +       return q->buf->data + ((prod & q->index_mask) << q->log2_elem_size);
>>   }
>>
>>   static inline void *consumer_addr(struct rxe_queue *q)
>>   {
>> -       return q->buf->data + ((q->buf->consumer_index & q->index_mask)
>> -                               << q->log2_elem_size);
>> +       u32 cons;
>> +
>> +       if (q->type == QUEUE_TYPE_TO_USER)
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +       else
>> +               cons = q->buf->consumer_index;
>> +
>> +       return q->buf->data + ((cons & q->index_mask) << q->log2_elem_size);
>>   }
>>
>>   static inline unsigned int producer_index(struct rxe_queue *q)
>>   {
>> -       u32 index;
>> +       u32 prod;
>> +
>> +       if (q->type == QUEUE_TYPE_FROM_USER)
>> +               /* protect user space index */
>> +               prod = smp_load_acquire(&q->buf->producer_index);
>> +       else
>> +               prod = q->buf->producer_index;
>>
>> -       /* make sure all changes to queue
>> -        * complete before getting producer index
>> -        */
>> -       index = smp_load_acquire(&q->buf->producer_index);
>> -       index &= q->index_mask;
>> +       prod &= q->index_mask;
>>
>> -       return index;
>> +       return prod;
>>   }
>>
>>   static inline unsigned int consumer_index(struct rxe_queue *q)
>>   {
>> -       u32 index;
>> +       u32 cons;
>>
>> -       /* make sure all changes to queue
>> -        * complete before getting consumer index
>> -        */
>> -       index = smp_load_acquire(&q->buf->consumer_index);
>> -       index &= q->index_mask;
>> +       if (q->type == QUEUE_TYPE_TO_USER)
>> +               /* protect user space index */
>> +               cons = smp_load_acquire(&q->buf->consumer_index);
>> +       else
>> +               cons = q->buf->consumer_index;
>>
>> -       return index;
>> +       cons &= q->index_mask;
>> +
>> +       return cons;
>>   }
>>
>> -static inline void *addr_from_index(struct rxe_queue *q, unsigned int index)
>> +static inline void *addr_from_index(struct rxe_queue *q,
>> +                               unsigned int index)
>>   {
>>          return q->buf->data + ((index & q->index_mask)
>>                                  << q->buf->log2_elem_size);
>>   }
>>
>>   static inline unsigned int index_from_addr(const struct rxe_queue *q,
>> -                                          const void *addr)
>> +                               const void *addr)
>>   {
>>          return (((u8 *)addr - q->buf->data) >> q->log2_elem_size)
>> -               & q->index_mask;
>> -}
>> -
>> -static inline unsigned int queue_count(const struct rxe_queue *q)
>> -{
>> -       return (q->buf->producer_index - q->buf->consumer_index)
>> -               & q->index_mask;
>> +                               & q->index_mask;
>>   }
>>
>>   static inline void *queue_head(struct rxe_queue *q)
>> --
>> 2.30.2
>>
Jason Gunthorpe May 26, 2021, 4:52 p.m. UTC | #3
On Tue, May 25, 2021 at 11:51:40PM -0500, Bob Pearson wrote:
> Modify the queue APIs to protect all user space index loads
> with smp_load_acquire() and all user space index stores with
> smp_store_release(). Base this on the types of the queues which
> can be one of ..KERNEL, ..FROM_USER, ..TO_USER. Kernel space
> indices are protected by locks which also provide memory barriers.
> 
> Signed-off-by: Bob Pearson <rpearsonhpe@gmail.com>
> v2:
>   In v2 use queue type to selectively protect user space indices.
>  drivers/infiniband/sw/rxe/rxe_queue.h | 168 ++++++++++++++++++--------
>  1 file changed, 117 insertions(+), 51 deletions(-)
> 
> diff --git a/drivers/infiniband/sw/rxe/rxe_queue.h b/drivers/infiniband/sw/rxe/rxe_queue.h
> index 4512745419f8..6e705e09d357 100644
> +++ b/drivers/infiniband/sw/rxe/rxe_queue.h
> @@ -66,12 +66,22 @@ static inline int queue_empty(struct rxe_queue *q)
>  	u32 prod;
>  	u32 cons;
>  
> -	/* make sure all changes to queue complete before
> -	 * testing queue empty
> -	 */
> -	prod = smp_load_acquire(&q->buf->producer_index);
> -	/* same */
> -	cons = smp_load_acquire(&q->buf->consumer_index);
> +	switch (q->type) {
> +	case QUEUE_TYPE_FROM_USER:
> +		/* protect user space index */
> +		prod = smp_load_acquire(&q->buf->producer_index);
> +		cons = q->buf->consumer_index;

The other issue is you can't store the kernel owned consumer_index in
the 'buf'

It should be stored in 'q' and only on write copied to be buf

Kernel never reads the user memory it writes to

This is why splitting it makes sense because it really needs to be
reading different memory, not just using the correct load primitive

Jason
diff mbox series

Patch

diff --git a/drivers/infiniband/sw/rxe/rxe_queue.h b/drivers/infiniband/sw/rxe/rxe_queue.h
index 4512745419f8..6e705e09d357 100644
--- a/drivers/infiniband/sw/rxe/rxe_queue.h
+++ b/drivers/infiniband/sw/rxe/rxe_queue.h
@@ -66,12 +66,22 @@  static inline int queue_empty(struct rxe_queue *q)
 	u32 prod;
 	u32 cons;
 
-	/* make sure all changes to queue complete before
-	 * testing queue empty
-	 */
-	prod = smp_load_acquire(&q->buf->producer_index);
-	/* same */
-	cons = smp_load_acquire(&q->buf->consumer_index);
+	switch (q->type) {
+	case QUEUE_TYPE_FROM_USER:
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+		cons = q->buf->consumer_index;
+		break;
+	case QUEUE_TYPE_TO_USER:
+		prod = q->buf->producer_index;
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+		break;
+	case QUEUE_TYPE_KERNEL:
+		prod = q->buf->producer_index;
+		cons = q->buf->consumer_index;
+		break;
+	}
 
 	return ((prod - cons) & q->index_mask) == 0;
 }
@@ -81,95 +91,151 @@  static inline int queue_full(struct rxe_queue *q)
 	u32 prod;
 	u32 cons;
 
-	/* make sure all changes to queue complete before
-	 * testing queue full
-	 */
-	prod = smp_load_acquire(&q->buf->producer_index);
-	/* same */
-	cons = smp_load_acquire(&q->buf->consumer_index);
+	switch (q->type) {
+	case QUEUE_TYPE_FROM_USER:
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+		cons = q->buf->consumer_index;
+		break;
+	case QUEUE_TYPE_TO_USER:
+		prod = q->buf->producer_index;
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+		break;
+	case QUEUE_TYPE_KERNEL:
+		prod = q->buf->producer_index;
+		cons = q->buf->consumer_index;
+		break;
+	}
 
 	return ((prod + 1 - cons) & q->index_mask) == 0;
 }
 
-static inline void advance_producer(struct rxe_queue *q)
+static inline unsigned int queue_count(const struct rxe_queue *q)
 {
 	u32 prod;
+	u32 cons;
 
-	prod = (q->buf->producer_index + 1) & q->index_mask;
+	switch (q->type) {
+	case QUEUE_TYPE_FROM_USER:
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+		cons = q->buf->consumer_index;
+		break;
+	case QUEUE_TYPE_TO_USER:
+		prod = q->buf->producer_index;
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+		break;
+	case QUEUE_TYPE_KERNEL:
+		prod = q->buf->producer_index;
+		cons = q->buf->consumer_index;
+		break;
+	}
+
+	return (prod - cons) & q->index_mask;
+}
+
+static inline void advance_producer(struct rxe_queue *q)
+{
+	u32 prod;
 
-	/* make sure all changes to queue complete before
-	 * changing producer index
-	 */
-	smp_store_release(&q->buf->producer_index, prod);
+	if (q->type == QUEUE_TYPE_FROM_USER) {
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+		prod = (prod + 1) & q->index_mask;
+		/* same */
+		smp_store_release(&q->buf->producer_index, prod);
+	} else {
+		prod = q->buf->producer_index;
+		q->buf->producer_index = (prod + 1) & q->index_mask;
+	}
 }
 
 static inline void advance_consumer(struct rxe_queue *q)
 {
 	u32 cons;
 
-	cons = (q->buf->consumer_index + 1) & q->index_mask;
-
-	/* make sure all changes to queue complete before
-	 * changing consumer index
-	 */
-	smp_store_release(&q->buf->consumer_index, cons);
+	if (q->type == QUEUE_TYPE_TO_USER) {
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+		cons = (cons + 1) & q->index_mask;
+		/* same */
+		smp_store_release(&q->buf->consumer_index, cons);
+	} else {
+		cons = q->buf->consumer_index;
+		q->buf->consumer_index = (cons + 1) & q->index_mask;
+	}
 }
 
 static inline void *producer_addr(struct rxe_queue *q)
 {
-	return q->buf->data + ((q->buf->producer_index & q->index_mask)
-				<< q->log2_elem_size);
+	u32 prod;
+
+	if (q->type == QUEUE_TYPE_FROM_USER)
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+	else
+		prod = q->buf->producer_index;
+
+	return q->buf->data + ((prod & q->index_mask) << q->log2_elem_size);
 }
 
 static inline void *consumer_addr(struct rxe_queue *q)
 {
-	return q->buf->data + ((q->buf->consumer_index & q->index_mask)
-				<< q->log2_elem_size);
+	u32 cons;
+
+	if (q->type == QUEUE_TYPE_TO_USER)
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+	else
+		cons = q->buf->consumer_index;
+
+	return q->buf->data + ((cons & q->index_mask) << q->log2_elem_size);
 }
 
 static inline unsigned int producer_index(struct rxe_queue *q)
 {
-	u32 index;
+	u32 prod;
+
+	if (q->type == QUEUE_TYPE_FROM_USER)
+		/* protect user space index */
+		prod = smp_load_acquire(&q->buf->producer_index);
+	else
+		prod = q->buf->producer_index;
 
-	/* make sure all changes to queue
-	 * complete before getting producer index
-	 */
-	index = smp_load_acquire(&q->buf->producer_index);
-	index &= q->index_mask;
+	prod &= q->index_mask;
 
-	return index;
+	return prod;
 }
 
 static inline unsigned int consumer_index(struct rxe_queue *q)
 {
-	u32 index;
+	u32 cons;
 
-	/* make sure all changes to queue
-	 * complete before getting consumer index
-	 */
-	index = smp_load_acquire(&q->buf->consumer_index);
-	index &= q->index_mask;
+	if (q->type == QUEUE_TYPE_TO_USER)
+		/* protect user space index */
+		cons = smp_load_acquire(&q->buf->consumer_index);
+	else
+		cons = q->buf->consumer_index;
 
-	return index;
+	cons &= q->index_mask;
+
+	return cons;
 }
 
-static inline void *addr_from_index(struct rxe_queue *q, unsigned int index)
+static inline void *addr_from_index(struct rxe_queue *q,
+				unsigned int index)
 {
 	return q->buf->data + ((index & q->index_mask)
 				<< q->buf->log2_elem_size);
 }
 
 static inline unsigned int index_from_addr(const struct rxe_queue *q,
-					   const void *addr)
+				const void *addr)
 {
 	return (((u8 *)addr - q->buf->data) >> q->log2_elem_size)
-		& q->index_mask;
-}
-
-static inline unsigned int queue_count(const struct rxe_queue *q)
-{
-	return (q->buf->producer_index - q->buf->consumer_index)
-		& q->index_mask;
+				& q->index_mask;
 }
 
 static inline void *queue_head(struct rxe_queue *q)