diff mbox

[v3,11/13] xen/pvcalls: implement poll command

Message ID 1501541855-7354-11-git-send-email-sstabellini@kernel.org (mailing list archive)
State New, archived
Headers show

Commit Message

Stefano Stabellini July 31, 2017, 10:57 p.m. UTC
For active sockets, check the indexes and use the inflight_conn_req
waitqueue to wait.

For passive sockets if an accept is outstanding
(PVCALLS_FLAG_ACCEPT_INFLIGHT), check if it has been answered by looking
at bedata->rsp[req_id]. If so, return POLLIN.  Otherwise use the
inflight_accept_req waitqueue.

If no accepts are inflight, send PVCALLS_POLL to the backend. If we have
outstanding POLL requests awaiting for a response use the inflight_req
waitqueue: inflight_req is awaken when a new response is received; on
wakeup we check whether the POLL response is arrived by looking at the
PVCALLS_FLAG_POLL_RET flag. We set the flag from
pvcalls_front_event_handler, if the response was for a POLL command.

In pvcalls_front_event_handler, get the struct sock_mapping from the
poll id (we previously converted struct sock_mapping* to uint64_t and
used it as id).

Signed-off-by: Stefano Stabellini <stefano@aporeto.com>
CC: boris.ostrovsky@oracle.com
CC: jgross@suse.com
---
 drivers/xen/pvcalls-front.c | 135 +++++++++++++++++++++++++++++++++++++++++---
 drivers/xen/pvcalls-front.h |   3 +
 2 files changed, 129 insertions(+), 9 deletions(-)

Comments

Boris Ostrovsky Aug. 15, 2017, 8:30 p.m. UTC | #1
On 07/31/2017 06:57 PM, Stefano Stabellini wrote:
> For active sockets, check the indexes and use the inflight_conn_req
> waitqueue to wait.
>
> For passive sockets if an accept is outstanding
> (PVCALLS_FLAG_ACCEPT_INFLIGHT), check if it has been answered by looking
> at bedata->rsp[req_id]. If so, return POLLIN.  Otherwise use the
> inflight_accept_req waitqueue.
>
> If no accepts are inflight, send PVCALLS_POLL to the backend. If we have
> outstanding POLL requests awaiting for a response use the inflight_req
> waitqueue: inflight_req is awaken when a new response is received; on
> wakeup we check whether the POLL response is arrived by looking at the
> PVCALLS_FLAG_POLL_RET flag. We set the flag from
> pvcalls_front_event_handler, if the response was for a POLL command.
>
> In pvcalls_front_event_handler, get the struct sock_mapping from the
> poll id (we previously converted struct sock_mapping* to uint64_t and
> used it as id).
>
> Signed-off-by: Stefano Stabellini <stefano@aporeto.com>
> CC: boris.ostrovsky@oracle.com
> CC: jgross@suse.com
> ---
>  drivers/xen/pvcalls-front.c | 135 +++++++++++++++++++++++++++++++++++++++++---
>  drivers/xen/pvcalls-front.h |   3 +
>  2 files changed, 129 insertions(+), 9 deletions(-)
>
> diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> index 635a83a..1c975d6 100644
> --- a/drivers/xen/pvcalls-front.c
> +++ b/drivers/xen/pvcalls-front.c
> @@ -72,6 +72,8 @@ struct sock_mapping {
>  		 * Only one poll operation can be inflight for a given socket.
>  		 */
>  #define PVCALLS_FLAG_ACCEPT_INFLIGHT 0
> +#define PVCALLS_FLAG_POLL_INFLIGHT   1
> +#define PVCALLS_FLAG_POLL_RET        2
>  			uint8_t flags;
>  			uint32_t inflight_req_id;
>  			struct sock_mapping *accept_map;
> @@ -139,15 +141,32 @@ static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
>  		rsp = RING_GET_RESPONSE(&bedata->ring, bedata->ring.rsp_cons);
>  
>  		req_id = rsp->req_id;
> -		dst = (uint8_t *)&bedata->rsp[req_id] + sizeof(rsp->req_id);
> -		src = (uint8_t *)rsp + sizeof(rsp->req_id);
> -		memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
> -		/*
> -		 * First copy the rest of the data, then req_id. It is
> -		 * paired with the barrier when accessing bedata->rsp.
> -		 */
> -		smp_wmb();
> -		WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
> +		if (rsp->cmd == PVCALLS_POLL) {
> +			struct sock_mapping *map = (struct sock_mapping *)
> +						   rsp->u.poll.id;
> +
> +			set_bit(PVCALLS_FLAG_POLL_RET,
> +				(void *)&map->passive.flags);
> +			/*
> +			 * Set RET, then clear INFLIGHT. It pairs with
> +			 * the checks at the beginning of
> +			 * pvcalls_front_poll_passive.
> +			 */
> +			smp_wmb();
> +			clear_bit(PVCALLS_FLAG_POLL_INFLIGHT,
> +				  (void *)&map->passive.flags);
> +		} else {
> +			dst = (uint8_t *)&bedata->rsp[req_id] +
> +			      sizeof(rsp->req_id);
> +			src = (uint8_t *)rsp + sizeof(rsp->req_id);
> +			memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
> +			/*
> +			 * First copy the rest of the data, then req_id. It is
> +			 * paired with the barrier when accessing bedata->rsp.
> +			 */
> +			smp_wmb();
> +			WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
> +		}
>  
>  		done = 1;
>  		bedata->ring.rsp_cons++;
> @@ -736,6 +755,104 @@ int pvcalls_front_accept(struct socket *sock, struct socket *newsock, int flags)
>  	return ret;
>  }
>  
> +static unsigned int pvcalls_front_poll_passive(struct file *file,
> +					       struct pvcalls_bedata *bedata,
> +					       struct sock_mapping *map,
> +					       poll_table *wait)
> +{
> +	int notify, req_id, ret;
> +	struct xen_pvcalls_request *req;
> +

I am a bit confused by the logic here.

> +	if (test_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
> +		     (void *)&map->passive.flags)) {
> +		uint32_t req_id = READ_ONCE(map->passive.inflight_req_id);
> +		if (req_id != PVCALLS_INVALID_ID &&
> +		    READ_ONCE(bedata->rsp[req_id].req_id) == req_id)
> +			return POLLIN;

This is successful accept? Why is it returning POLLIN only and not
POLLIN | POLLRDNORM (or POLLOUT, for that matter)?

> +
> +		poll_wait(file, &map->passive.inflight_accept_req, wait);
> +		return 0;
> +	}
> +
> +	if (test_and_clear_bit(PVCALLS_FLAG_POLL_RET,
> +			       (void *)&map->passive.flags))
> +		return POLLIN;

This is previous poll request completing?



> +
> +	/*
> +	 * First check RET, then INFLIGHT. No barriers necessary to
> +	 * ensure execution ordering because of the conditional
> +	 * instructions creating control dependencies.
> +	 */
> +
> +	if (test_and_set_bit(PVCALLS_FLAG_POLL_INFLIGHT,
> +			     (void *)&map->passive.flags)) {
> +		poll_wait(file, &bedata->inflight_req, wait);
> +		return 0;
> +	}

This I don't understand, could you explain?



> +
> +	spin_lock(&bedata->pvcallss_lock);
> +	ret = get_request(bedata, &req_id);
> +	if (ret < 0) {
> +		spin_unlock(&bedata->pvcallss_lock);
> +		return ret;
> +	}
> +	req = RING_GET_REQUEST(&bedata->ring, req_id);
> +	req->req_id = req_id;
> +	req->cmd = PVCALLS_POLL;
> +	req->u.poll.id = (uint64_t) map;
> +
> +	bedata->ring.req_prod_pvt++;
> +	RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> +	spin_unlock(&bedata->pvcallss_lock);
> +	if (notify)
> +		notify_remote_via_irq(bedata->irq);
> +
> +	poll_wait(file, &bedata->inflight_req, wait);
> +	return 0;
> +}
> +
> +static unsigned int pvcalls_front_poll_active(struct file *file,
> +					      struct pvcalls_bedata *bedata,
> +					      struct sock_mapping *map,
> +					      poll_table *wait)
> +{
> +	unsigned int mask = 0;
> +	int32_t in_error, out_error;
> +	struct pvcalls_data_intf *intf = map->active.ring;
> +
> +	out_error = intf->out_error;
> +	in_error = intf->in_error;
> +
> +	poll_wait(file, &map->active.inflight_conn_req, wait);
> +	if (pvcalls_front_write_todo(map))
> +		mask |= POLLOUT | POLLWRNORM;
> +	if (pvcalls_front_read_todo(map))
> +		mask |= POLLIN | POLLRDNORM;
> +	if (in_error != 0 || out_error != 0)
> +		mask |= POLLERR;
> +
> +	return mask;
> +}
> +
> +unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
> +			       poll_table *wait)
> +{
> +	struct pvcalls_bedata *bedata;
> +	struct sock_mapping *map;
> +
> +	if (!pvcalls_front_dev)
> +		return POLLNVAL;
> +	bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> +
> +	map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);

I just noticed this --- why is it READ_ONCE? Are you concerned that
sk_send_head may change?

-boris

> +	if (!map)
> +		return POLLNVAL;
> +	if (map->active_socket)
> +		return pvcalls_front_poll_active(file, bedata, map, wait);
> +	else
> +		return pvcalls_front_poll_passive(file, bedata, map, wait);
> +}
> +
>  static const struct xenbus_device_id pvcalls_front_ids[] = {
>  	{ "pvcalls" },
>  	{ "" }
> diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> index de24041..25e05b8 100644
> --- a/drivers/xen/pvcalls-front.h
> +++ b/drivers/xen/pvcalls-front.h
> @@ -20,5 +20,8 @@ int pvcalls_front_recvmsg(struct socket *sock,
>  			  struct msghdr *msg,
>  			  size_t len,
>  			  int flags);
> +unsigned int pvcalls_front_poll(struct file *file,
> +				struct socket *sock,
> +				poll_table *wait);
>  
>  #endif
Stefano Stabellini Sept. 8, 2017, 11:03 p.m. UTC | #2
On Tue, 15 Aug 2017, Boris Ostrovsky wrote:
> > @@ -736,6 +755,104 @@ int pvcalls_front_accept(struct socket *sock, struct socket *newsock, int flags)
> >  	return ret;
> >  }
> >  
> > +static unsigned int pvcalls_front_poll_passive(struct file *file,
> > +					       struct pvcalls_bedata *bedata,
> > +					       struct sock_mapping *map,
> > +					       poll_table *wait)
> > +{
> > +	int notify, req_id, ret;
> > +	struct xen_pvcalls_request *req;
> > +
> 
> I am a bit confused by the logic here.
> 
> > +	if (test_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
> > +		     (void *)&map->passive.flags)) {
> > +		uint32_t req_id = READ_ONCE(map->passive.inflight_req_id);
> > +		if (req_id != PVCALLS_INVALID_ID &&
> > +		    READ_ONCE(bedata->rsp[req_id].req_id) == req_id)
> > +			return POLLIN;
> 
> This is successful accept?

Yes: if an accept reply is pending, there must be something to read on
the passive socket.


> Why is it returning POLLIN only and not
> POLLIN | POLLRDNORM (or POLLOUT, for that matter)?

Keep in mind that this is a passive socket. There is no writing to
them. But yes, POLLIN and POLLRDNORM are equivalent, so it is best to
set both. I'll do that.


> > +
> > +		poll_wait(file, &map->passive.inflight_accept_req, wait);
> > +		return 0;
> > +	}
> > +
> > +	if (test_and_clear_bit(PVCALLS_FLAG_POLL_RET,
> > +			       (void *)&map->passive.flags))
> > +		return POLLIN;
> 
> This is previous poll request completing?

Yes, a previous POLL request completing.


> > +
> > +	/*
> > +	 * First check RET, then INFLIGHT. No barriers necessary to
> > +	 * ensure execution ordering because of the conditional
> > +	 * instructions creating control dependencies.
> > +	 */
> > +
> > +	if (test_and_set_bit(PVCALLS_FLAG_POLL_INFLIGHT,
> > +			     (void *)&map->passive.flags)) {
> > +		poll_wait(file, &bedata->inflight_req, wait);
> > +		return 0;
> > +	}
> 
> This I don't understand, could you explain?

If none of the conditions above apply (there isn't an
accept reply ready, and there isn't a poll reply ready) then it means
that we actually need to tell the caller to wait. This is done by
calling poll_wait to add a wait_queue to the poll table.

Then we need to tell the backend that we are polling, that is done by
the code below that sends a PVCALLS_POLL message to the backend.

The check on PVCALLS_FLAG_POLL_INFLIGHT here is to avoid sending
multiple PVCALLS_POLL requests to the backend. Thus, if there is one
already in-flight, we can just call poll_wait and return immediately
without sending a second request.
 
 
> > +
> > +	spin_lock(&bedata->pvcallss_lock);
> > +	ret = get_request(bedata, &req_id);
> > +	if (ret < 0) {
> > +		spin_unlock(&bedata->pvcallss_lock);
> > +		return ret;
> > +	}
> > +	req = RING_GET_REQUEST(&bedata->ring, req_id);
> > +	req->req_id = req_id;
> > +	req->cmd = PVCALLS_POLL;
> > +	req->u.poll.id = (uint64_t) map;
> > +
> > +	bedata->ring.req_prod_pvt++;
> > +	RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
> > +	spin_unlock(&bedata->pvcallss_lock);
> > +	if (notify)
> > +		notify_remote_via_irq(bedata->irq);
> > +
> > +	poll_wait(file, &bedata->inflight_req, wait);
> > +	return 0;
> > +}
> > +
> > +static unsigned int pvcalls_front_poll_active(struct file *file,
> > +					      struct pvcalls_bedata *bedata,
> > +					      struct sock_mapping *map,
> > +					      poll_table *wait)
> > +{
> > +	unsigned int mask = 0;
> > +	int32_t in_error, out_error;
> > +	struct pvcalls_data_intf *intf = map->active.ring;
> > +
> > +	out_error = intf->out_error;
> > +	in_error = intf->in_error;
> > +
> > +	poll_wait(file, &map->active.inflight_conn_req, wait);
> > +	if (pvcalls_front_write_todo(map))
> > +		mask |= POLLOUT | POLLWRNORM;
> > +	if (pvcalls_front_read_todo(map))
> > +		mask |= POLLIN | POLLRDNORM;
> > +	if (in_error != 0 || out_error != 0)
> > +		mask |= POLLERR;
> > +
> > +	return mask;
> > +}
> > +
> > +unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
> > +			       poll_table *wait)
> > +{
> > +	struct pvcalls_bedata *bedata;
> > +	struct sock_mapping *map;
> > +
> > +	if (!pvcalls_front_dev)
> > +		return POLLNVAL;
> > +	bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> > +
> > +	map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> 
> I just noticed this --- why is it READ_ONCE? Are you concerned that
> sk_send_head may change?

No, but I wanted to avoid partial reads. A caller could call
pvcalls_front_accept and pvcalls_front_poll on newsock almost at the
same time (it is probably not the correct way to use the API), I wanted
to make sure that "map" is either read correctly, or not read at all.


> > +	if (!map)
> > +		return POLLNVAL;
> > +	if (map->active_socket)
> > +		return pvcalls_front_poll_active(file, bedata, map, wait);
> > +	else
> > +		return pvcalls_front_poll_passive(file, bedata, map, wait);
> > +}
> > +
> >  static const struct xenbus_device_id pvcalls_front_ids[] = {
> >  	{ "pvcalls" },
> >  	{ "" }
> > diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> > index de24041..25e05b8 100644
> > --- a/drivers/xen/pvcalls-front.h
> > +++ b/drivers/xen/pvcalls-front.h
> > @@ -20,5 +20,8 @@ int pvcalls_front_recvmsg(struct socket *sock,
> >  			  struct msghdr *msg,
> >  			  size_t len,
> >  			  int flags);
> > +unsigned int pvcalls_front_poll(struct file *file,
> > +				struct socket *sock,
> > +				poll_table *wait);
> >  
> >  #endif
>
Boris Ostrovsky Sept. 12, 2017, 2:14 p.m. UTC | #3
>>> +
>>> +unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
>>> +			       poll_table *wait)
>>> +{
>>> +	struct pvcalls_bedata *bedata;
>>> +	struct sock_mapping *map;
>>> +
>>> +	if (!pvcalls_front_dev)
>>> +		return POLLNVAL;
>>> +	bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
>>> +
>>> +	map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
>> I just noticed this --- why is it READ_ONCE? Are you concerned that
>> sk_send_head may change?
> No, but I wanted to avoid partial reads. A caller could call
> pvcalls_front_accept and pvcalls_front_poll on newsock almost at the
> same time (it is probably not the correct way to use the API), I wanted
> to make sure that "map" is either read correctly, or not read at all.

How can you have a partial read on a pointer?

-boris
Stefano Stabellini Sept. 12, 2017, 10:17 p.m. UTC | #4
On Tue, 12 Sep 2017, Boris Ostrovsky wrote:
> >>> +
> >>> +unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
> >>> +			       poll_table *wait)
> >>> +{
> >>> +	struct pvcalls_bedata *bedata;
> >>> +	struct sock_mapping *map;
> >>> +
> >>> +	if (!pvcalls_front_dev)
> >>> +		return POLLNVAL;
> >>> +	bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> >>> +
> >>> +	map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> >> I just noticed this --- why is it READ_ONCE? Are you concerned that
> >> sk_send_head may change?
> > No, but I wanted to avoid partial reads. A caller could call
> > pvcalls_front_accept and pvcalls_front_poll on newsock almost at the
> > same time (it is probably not the correct way to use the API), I wanted
> > to make sure that "map" is either read correctly, or not read at all.
> 
> How can you have a partial read on a pointer?

I don't think that the compiler makes any promises on translating a
pointer read into a single read instruction. Of couse, I expect gcc to
actually do it without any need for READ/WRITE_ONCE.
Boris Ostrovsky Sept. 12, 2017, 11:04 p.m. UTC | #5
On 09/12/2017 06:17 PM, Stefano Stabellini wrote:
> On Tue, 12 Sep 2017, Boris Ostrovsky wrote:
>>>>> +
>>>>> +unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
>>>>> +			       poll_table *wait)
>>>>> +{
>>>>> +	struct pvcalls_bedata *bedata;
>>>>> +	struct sock_mapping *map;
>>>>> +
>>>>> +	if (!pvcalls_front_dev)
>>>>> +		return POLLNVAL;
>>>>> +	bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
>>>>> +
>>>>> +	map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
>>>> I just noticed this --- why is it READ_ONCE? Are you concerned that
>>>> sk_send_head may change?
>>> No, but I wanted to avoid partial reads. A caller could call
>>> pvcalls_front_accept and pvcalls_front_poll on newsock almost at the
>>> same time (it is probably not the correct way to use the API), I wanted
>>> to make sure that "map" is either read correctly, or not read at all.
>> How can you have a partial read on a pointer?
> I don't think that the compiler makes any promises on translating a
> pointer read into a single read instruction. Of couse, I expect gcc to
> actually do it without any need for READ/WRITE_ONCE.

READ_ONCE() only guarantees ordering but not atomicity. It resolves (for
64-bit pointers) to

        case 8: *(__u64 *)res = *(volatile __u64 *)p; break;

so if compiler was breaking accesses into two then nothing would have
prevented it from breaking them here (I don't think volatile declaration
would affect this). Moreover, for sizes >8 bytes  READ_ONCE() is
__builtin_memcpy() which is definitely not atomic.

So you can't rely on READ_ONCE being atomic from that perspective.

OTOH, I am pretty sure pointer accesses are guaranteed to be atomic. For
example, atomic64_read() is READ_ONCE(u64), which (per above) is
dereferencing of a 64-bit pointer in C.

-boris
Stefano Stabellini Sept. 12, 2017, 11:13 p.m. UTC | #6
On Tue, 12 Sep 2017, Boris Ostrovsky wrote:
> On 09/12/2017 06:17 PM, Stefano Stabellini wrote:
> > On Tue, 12 Sep 2017, Boris Ostrovsky wrote:
> >>>>> +
> >>>>> +unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
> >>>>> +			       poll_table *wait)
> >>>>> +{
> >>>>> +	struct pvcalls_bedata *bedata;
> >>>>> +	struct sock_mapping *map;
> >>>>> +
> >>>>> +	if (!pvcalls_front_dev)
> >>>>> +		return POLLNVAL;
> >>>>> +	bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> >>>>> +
> >>>>> +	map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> >>>> I just noticed this --- why is it READ_ONCE? Are you concerned that
> >>>> sk_send_head may change?
> >>> No, but I wanted to avoid partial reads. A caller could call
> >>> pvcalls_front_accept and pvcalls_front_poll on newsock almost at the
> >>> same time (it is probably not the correct way to use the API), I wanted
> >>> to make sure that "map" is either read correctly, or not read at all.
> >> How can you have a partial read on a pointer?
> > I don't think that the compiler makes any promises on translating a
> > pointer read into a single read instruction. Of couse, I expect gcc to
> > actually do it without any need for READ/WRITE_ONCE.
> 
> READ_ONCE() only guarantees ordering but not atomicity. It resolves (for
> 64-bit pointers) to
> 
>         case 8: *(__u64 *)res = *(volatile __u64 *)p; break;
> 
> so if compiler was breaking accesses into two then nothing would have
> prevented it from breaking them here (I don't think volatile declaration
> would affect this). Moreover, for sizes >8 bytes  READ_ONCE() is
> __builtin_memcpy() which is definitely not atomic.
> 
> So you can't rely on READ_ONCE being atomic from that perspective.

I thought that READ_ONCE guaranteed atomicity for sizes less or equal to
the machine word size. It doesn't make any atomicity guarantees for
sizes >8 bytes.


> OTOH, I am pretty sure pointer accesses are guaranteed to be atomic. For
> example, atomic64_read() is READ_ONCE(u64), which (per above) is
> dereferencing of a 64-bit pointer in C.

I am happy to remove the READ_ONCE and WRITE_ONCE, if we all think it is
safe.
Stefano Stabellini Sept. 12, 2017, 11:18 p.m. UTC | #7
On Tue, 12 Sep 2017, Stefano Stabellini wrote:
> On Tue, 12 Sep 2017, Boris Ostrovsky wrote:
> > On 09/12/2017 06:17 PM, Stefano Stabellini wrote:
> > > On Tue, 12 Sep 2017, Boris Ostrovsky wrote:
> > >>>>> +
> > >>>>> +unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
> > >>>>> +			       poll_table *wait)
> > >>>>> +{
> > >>>>> +	struct pvcalls_bedata *bedata;
> > >>>>> +	struct sock_mapping *map;
> > >>>>> +
> > >>>>> +	if (!pvcalls_front_dev)
> > >>>>> +		return POLLNVAL;
> > >>>>> +	bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> > >>>>> +
> > >>>>> +	map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> > >>>> I just noticed this --- why is it READ_ONCE? Are you concerned that
> > >>>> sk_send_head may change?
> > >>> No, but I wanted to avoid partial reads. A caller could call
> > >>> pvcalls_front_accept and pvcalls_front_poll on newsock almost at the
> > >>> same time (it is probably not the correct way to use the API), I wanted
> > >>> to make sure that "map" is either read correctly, or not read at all.
> > >> How can you have a partial read on a pointer?
> > > I don't think that the compiler makes any promises on translating a
> > > pointer read into a single read instruction. Of couse, I expect gcc to
> > > actually do it without any need for READ/WRITE_ONCE.
> > 
> > READ_ONCE() only guarantees ordering but not atomicity. It resolves (for
> > 64-bit pointers) to
> > 
> >         case 8: *(__u64 *)res = *(volatile __u64 *)p; break;
> > 
> > so if compiler was breaking accesses into two then nothing would have
> > prevented it from breaking them here (I don't think volatile declaration
> > would affect this). Moreover, for sizes >8 bytes  READ_ONCE() is
> > __builtin_memcpy() which is definitely not atomic.
> > 
> > So you can't rely on READ_ONCE being atomic from that perspective.
> 
> I thought that READ_ONCE guaranteed atomicity for sizes less or equal to
> the machine word size. It doesn't make any atomicity guarantees for
> sizes >8 bytes.
> 
> 
> > OTOH, I am pretty sure pointer accesses are guaranteed to be atomic. For
> > example, atomic64_read() is READ_ONCE(u64), which (per above) is
> > dereferencing of a 64-bit pointer in C.
> 
> I am happy to remove the READ_ONCE and WRITE_ONCE, if we all think it is
> safe.

Looking at other code in Linux, it seems that they are making this
assumption in many places. I'll remove READ/WRITE_ONCE.
diff mbox

Patch

diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index 635a83a..1c975d6 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -72,6 +72,8 @@  struct sock_mapping {
 		 * Only one poll operation can be inflight for a given socket.
 		 */
 #define PVCALLS_FLAG_ACCEPT_INFLIGHT 0
+#define PVCALLS_FLAG_POLL_INFLIGHT   1
+#define PVCALLS_FLAG_POLL_RET        2
 			uint8_t flags;
 			uint32_t inflight_req_id;
 			struct sock_mapping *accept_map;
@@ -139,15 +141,32 @@  static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
 		rsp = RING_GET_RESPONSE(&bedata->ring, bedata->ring.rsp_cons);
 
 		req_id = rsp->req_id;
-		dst = (uint8_t *)&bedata->rsp[req_id] + sizeof(rsp->req_id);
-		src = (uint8_t *)rsp + sizeof(rsp->req_id);
-		memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
-		/*
-		 * First copy the rest of the data, then req_id. It is
-		 * paired with the barrier when accessing bedata->rsp.
-		 */
-		smp_wmb();
-		WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
+		if (rsp->cmd == PVCALLS_POLL) {
+			struct sock_mapping *map = (struct sock_mapping *)
+						   rsp->u.poll.id;
+
+			set_bit(PVCALLS_FLAG_POLL_RET,
+				(void *)&map->passive.flags);
+			/*
+			 * Set RET, then clear INFLIGHT. It pairs with
+			 * the checks at the beginning of
+			 * pvcalls_front_poll_passive.
+			 */
+			smp_wmb();
+			clear_bit(PVCALLS_FLAG_POLL_INFLIGHT,
+				  (void *)&map->passive.flags);
+		} else {
+			dst = (uint8_t *)&bedata->rsp[req_id] +
+			      sizeof(rsp->req_id);
+			src = (uint8_t *)rsp + sizeof(rsp->req_id);
+			memcpy(dst, src, sizeof(*rsp) - sizeof(rsp->req_id));
+			/*
+			 * First copy the rest of the data, then req_id. It is
+			 * paired with the barrier when accessing bedata->rsp.
+			 */
+			smp_wmb();
+			WRITE_ONCE(bedata->rsp[req_id].req_id, rsp->req_id);
+		}
 
 		done = 1;
 		bedata->ring.rsp_cons++;
@@ -736,6 +755,104 @@  int pvcalls_front_accept(struct socket *sock, struct socket *newsock, int flags)
 	return ret;
 }
 
+static unsigned int pvcalls_front_poll_passive(struct file *file,
+					       struct pvcalls_bedata *bedata,
+					       struct sock_mapping *map,
+					       poll_table *wait)
+{
+	int notify, req_id, ret;
+	struct xen_pvcalls_request *req;
+
+	if (test_bit(PVCALLS_FLAG_ACCEPT_INFLIGHT,
+		     (void *)&map->passive.flags)) {
+		uint32_t req_id = READ_ONCE(map->passive.inflight_req_id);
+		if (req_id != PVCALLS_INVALID_ID &&
+		    READ_ONCE(bedata->rsp[req_id].req_id) == req_id)
+			return POLLIN;
+
+		poll_wait(file, &map->passive.inflight_accept_req, wait);
+		return 0;
+	}
+
+	if (test_and_clear_bit(PVCALLS_FLAG_POLL_RET,
+			       (void *)&map->passive.flags))
+		return POLLIN;
+
+	/*
+	 * First check RET, then INFLIGHT. No barriers necessary to
+	 * ensure execution ordering because of the conditional
+	 * instructions creating control dependencies.
+	 */
+
+	if (test_and_set_bit(PVCALLS_FLAG_POLL_INFLIGHT,
+			     (void *)&map->passive.flags)) {
+		poll_wait(file, &bedata->inflight_req, wait);
+		return 0;
+	}
+
+	spin_lock(&bedata->pvcallss_lock);
+	ret = get_request(bedata, &req_id);
+	if (ret < 0) {
+		spin_unlock(&bedata->pvcallss_lock);
+		return ret;
+	}
+	req = RING_GET_REQUEST(&bedata->ring, req_id);
+	req->req_id = req_id;
+	req->cmd = PVCALLS_POLL;
+	req->u.poll.id = (uint64_t) map;
+
+	bedata->ring.req_prod_pvt++;
+	RING_PUSH_REQUESTS_AND_CHECK_NOTIFY(&bedata->ring, notify);
+	spin_unlock(&bedata->pvcallss_lock);
+	if (notify)
+		notify_remote_via_irq(bedata->irq);
+
+	poll_wait(file, &bedata->inflight_req, wait);
+	return 0;
+}
+
+static unsigned int pvcalls_front_poll_active(struct file *file,
+					      struct pvcalls_bedata *bedata,
+					      struct sock_mapping *map,
+					      poll_table *wait)
+{
+	unsigned int mask = 0;
+	int32_t in_error, out_error;
+	struct pvcalls_data_intf *intf = map->active.ring;
+
+	out_error = intf->out_error;
+	in_error = intf->in_error;
+
+	poll_wait(file, &map->active.inflight_conn_req, wait);
+	if (pvcalls_front_write_todo(map))
+		mask |= POLLOUT | POLLWRNORM;
+	if (pvcalls_front_read_todo(map))
+		mask |= POLLIN | POLLRDNORM;
+	if (in_error != 0 || out_error != 0)
+		mask |= POLLERR;
+
+	return mask;
+}
+
+unsigned int pvcalls_front_poll(struct file *file, struct socket *sock,
+			       poll_table *wait)
+{
+	struct pvcalls_bedata *bedata;
+	struct sock_mapping *map;
+
+	if (!pvcalls_front_dev)
+		return POLLNVAL;
+	bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+	map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
+	if (!map)
+		return POLLNVAL;
+	if (map->active_socket)
+		return pvcalls_front_poll_active(file, bedata, map, wait);
+	else
+		return pvcalls_front_poll_passive(file, bedata, map, wait);
+}
+
 static const struct xenbus_device_id pvcalls_front_ids[] = {
 	{ "pvcalls" },
 	{ "" }
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
index de24041..25e05b8 100644
--- a/drivers/xen/pvcalls-front.h
+++ b/drivers/xen/pvcalls-front.h
@@ -20,5 +20,8 @@  int pvcalls_front_recvmsg(struct socket *sock,
 			  struct msghdr *msg,
 			  size_t len,
 			  int flags);
+unsigned int pvcalls_front_poll(struct file *file,
+				struct socket *sock,
+				poll_table *wait);
 
 #endif