diff mbox

[v2,09/13] xen/pvcalls: implement recvmsg

Message ID 1501017730-12797-9-git-send-email-sstabellini@kernel.org (mailing list archive)
State New, archived
Headers show

Commit Message

Stefano Stabellini July 25, 2017, 9:22 p.m. UTC
Implement recvmsg by copying data from the "in" ring. If not enough data
is available and the recvmsg call is blocking, then wait on the
inflight_conn_req waitqueue. Take the active socket in_mutex so that
only one function can access the ring at any given time.

If not enough data is available on the ring, rather than returning
immediately or sleep-waiting, spin for up to 5000 cycles. This small
optimization turns out to improve performance and latency significantly.

Signed-off-by: Stefano Stabellini <stefano@aporeto.com>
CC: boris.ostrovsky@oracle.com
CC: jgross@suse.com
---
 drivers/xen/pvcalls-front.c | 106 ++++++++++++++++++++++++++++++++++++++++++++
 drivers/xen/pvcalls-front.h |   4 ++
 2 files changed, 110 insertions(+)

Comments

Boris Ostrovsky July 26, 2017, 9:21 p.m. UTC | #1
On 07/25/2017 05:22 PM, Stefano Stabellini wrote:
> Implement recvmsg by copying data from the "in" ring. If not enough data
> is available and the recvmsg call is blocking, then wait on the
> inflight_conn_req waitqueue. Take the active socket in_mutex so that
> only one function can access the ring at any given time.
>
> If not enough data is available on the ring, rather than returning
> immediately or sleep-waiting, spin for up to 5000 cycles. This small
> optimization turns out to improve performance and latency significantly.
>
> Signed-off-by: Stefano Stabellini <stefano@aporeto.com>
> CC: boris.ostrovsky@oracle.com
> CC: jgross@suse.com
> ---
>  drivers/xen/pvcalls-front.c | 106 ++++++++++++++++++++++++++++++++++++++++++++
>  drivers/xen/pvcalls-front.h |   4 ++
>  2 files changed, 110 insertions(+)
>
> diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
> index d8ed280..b4ca569 100644
> --- a/drivers/xen/pvcalls-front.c
> +++ b/drivers/xen/pvcalls-front.c
> @@ -96,6 +96,20 @@ static int pvcalls_front_write_todo(struct sock_mapping *map)
>  	return size - pvcalls_queued(prod, cons, size);
>  }
>  
> +static bool pvcalls_front_read_todo(struct sock_mapping *map)
> +{
> +	struct pvcalls_data_intf *intf = map->active.ring;
> +	RING_IDX cons, prod;
> +	int32_t error;
> +
> +	cons = intf->in_cons;
> +	prod = intf->in_prod;
> +	error = intf->in_error;
> +	return (error != 0 ||
> +		pvcalls_queued(prod, cons,
> +			       XEN_FLEX_RING_SIZE(intf->ring_order))) != 0;
> +}
> +
>  static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
>  {
>  	struct xenbus_device *dev = dev_id;
> @@ -418,6 +432,98 @@ int pvcalls_front_sendmsg(struct socket *sock, struct msghdr *msg,
>  	return tot_sent;
>  }
>  
> +static int __read_ring(struct pvcalls_data_intf *intf,
> +		       struct pvcalls_data *data,
> +		       struct iov_iter *msg_iter,
> +		       size_t len, int flags)
> +{
> +	RING_IDX cons, prod, size, masked_prod, masked_cons;
> +	RING_IDX array_size = XEN_FLEX_RING_SIZE(intf->ring_order);
> +	int32_t error;
> +
> +	cons = intf->in_cons;
> +	prod = intf->in_prod;
> +	error = intf->in_error;
> +	/* get pointers before reading from the ring */
> +	virt_rmb();
> +	if (error < 0)
> +		return error;
> +
> +	size = pvcalls_queued(prod, cons, array_size);
> +	masked_prod = pvcalls_mask(prod, array_size);
> +	masked_cons = pvcalls_mask(cons, array_size);
> +
> +	if (size == 0)
> +		return 0;
> +
> +	if (len > size)
> +		len = size;
> +
> +	if (masked_prod > masked_cons) {
> +		copy_to_iter(data->in + masked_cons, len, msg_iter);
> +	} else {
> +		if (len > (array_size - masked_cons)) {
> +			copy_to_iter(data->in + masked_cons,
> +				     array_size - masked_cons, msg_iter);
> +			copy_to_iter(data->in,
> +				     len - (array_size - masked_cons),
> +				     msg_iter);
> +		} else {
> +			copy_to_iter(data->in + masked_cons, len, msg_iter);
> +		}
> +	}
> +	/* read data from the ring before increasing the index */
> +	virt_mb();
> +	if (!(flags & MSG_PEEK))
> +		intf->in_cons += len;
> +
> +	return len;
> +}
> +
> +int pvcalls_front_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> +		     int flags)
> +{
> +	struct pvcalls_bedata *bedata;
> +	int ret = -EAGAIN;
> +	struct sock_mapping *map;
> +	int count = 0;
> +
> +	if (!pvcalls_front_dev)
> +		return -ENOTCONN;
> +	bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
> +
> +	map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
> +	if (!map)
> +		return -ENOTSOCK;
> +
> +	if (flags & (MSG_CMSG_CLOEXEC|MSG_ERRQUEUE|MSG_OOB|MSG_TRUNC))
> +		return -EOPNOTSUPP;
> +
> +	mutex_lock(&map->active.in_mutex);
> +	if (len > XEN_FLEX_RING_SIZE(map->active.ring->ring_order))
> +		len = XEN_FLEX_RING_SIZE(map->active.ring->ring_order);
> +
> +	while (!(flags & MSG_DONTWAIT) && !pvcalls_front_read_todo(map)) {
> +		if (count < PVCALLS_FRONT_MAX_SPIN)
> +			count++;
> +		else
> +			wait_event_interruptible(map->active.inflight_conn_req,
> +						 pvcalls_front_read_todo(map));
> +	}

Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is
counting non-sleeping iterations but here we are sleeping so
PVCALLS_FRONT_MAX_SPIN (5000) may take a while.

In fact, what shouldn't this waiting be a function of MSG_DONTWAIT
and/or socket's O_NONBLOCK?

-boris


> +	ret = __read_ring(map->active.ring, &map->active.data,
> +			  &msg->msg_iter, len, flags);
> +
> +	if (ret > 0)
> +		notify_remote_via_irq(map->active.irq);
> +	if (ret == 0)
> +		ret = -EAGAIN;
> +	if (ret == -ENOTCONN)
> +		ret = 0;
> +
> +	mutex_unlock(&map->active.in_mutex);
> +	return ret;
> +}
> +
>  int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
>  {
>  	struct pvcalls_bedata *bedata;
> diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
> index d937c24..de24041 100644
> --- a/drivers/xen/pvcalls-front.h
> +++ b/drivers/xen/pvcalls-front.h
> @@ -16,5 +16,9 @@ int pvcalls_front_accept(struct socket *sock,
>  int pvcalls_front_sendmsg(struct socket *sock,
>  			  struct msghdr *msg,
>  			  size_t len);
> +int pvcalls_front_recvmsg(struct socket *sock,
> +			  struct msghdr *msg,
> +			  size_t len,
> +			  int flags);
>  
>  #endif
Boris Ostrovsky July 26, 2017, 9:33 p.m. UTC | #2
>> +	while (!(flags & MSG_DONTWAIT) && !pvcalls_front_read_todo(map)) {
>> +		if (count < PVCALLS_FRONT_MAX_SPIN)
>> +			count++;
>> +		else
>> +			wait_event_interruptible(map->active.inflight_conn_req,
>> +						 pvcalls_front_read_todo(map));
>> +	}
> Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is
> counting non-sleeping iterations but here we are sleeping so
> PVCALLS_FRONT_MAX_SPIN (5000) may take a while.
>
> In fact, what shouldn't this waiting be a function of MSG_DONTWAIT

err, which it already is. But the question still stands (except for
MSG_DONTWAIT).

-boris

> and/or socket's O_NONBLOCK?
Stefano Stabellini July 27, 2017, 12:08 a.m. UTC | #3
On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
> >> +			count++;
> >> +		else
> >> +			wait_event_interruptible(map->active.inflight_conn_req,
> >> +						 pvcalls_front_read_todo(map));
> >> +	}
> > Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is
> > counting non-sleeping iterations but here we are sleeping so
> > PVCALLS_FRONT_MAX_SPIN (5000) may take a while.
> >
> > In fact, what shouldn't this waiting be a function of MSG_DONTWAIT
> 
> err, which it already is. But the question still stands (except for
> MSG_DONTWAIT).

The code (admittedly unintuitive) is busy-looping (non-sleeping) for
5000 iterations *before* attempting to sleep. So in that regard, recvmsg
and sendmsg use PVCALLS_FRONT_MAX_SPIN in the same way: only for
non-sleeping iterations.
Boris Ostrovsky July 27, 2017, 2:59 p.m. UTC | #4
On 07/26/2017 08:08 PM, Stefano Stabellini wrote:
> On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
>>>> +			count++;
>>>> +		else
>>>> +			wait_event_interruptible(map->active.inflight_conn_req,
>>>> +						 pvcalls_front_read_todo(map));
>>>> +	}
>>> Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is
>>> counting non-sleeping iterations but here we are sleeping so
>>> PVCALLS_FRONT_MAX_SPIN (5000) may take a while.
>>>
>>> In fact, what shouldn't this waiting be a function of MSG_DONTWAIT
>> err, which it already is. But the question still stands (except for
>> MSG_DONTWAIT).
> The code (admittedly unintuitive) is busy-looping (non-sleeping) for
> 5000 iterations *before* attempting to sleep. So in that regard, recvmsg
> and sendmsg use PVCALLS_FRONT_MAX_SPIN in the same way: only for
> non-sleeping iterations.
>

OK.

Why not go directly into wait_event_interruptible()? I see you write in
the commit message

If not enough data is available on the ring, rather than returning
immediately or sleep-waiting, spin for up to 5000 cycles. This small
optimization turns out to improve performance and latency significantly.


Is this because of scheduling latency? I think this should be mentioned not just in the commit message but also as a comment in the code.

(I also think it's not "not enough data" but rather "no data"?)



-boris
Stefano Stabellini July 31, 2017, 10:26 p.m. UTC | #5
On Thu, 27 Jul 2017, Boris Ostrovsky wrote:
> On 07/26/2017 08:08 PM, Stefano Stabellini wrote:
> > On Wed, 26 Jul 2017, Boris Ostrovsky wrote:
> >>>> +			count++;
> >>>> +		else
> >>>> +			wait_event_interruptible(map->active.inflight_conn_req,
> >>>> +						 pvcalls_front_read_todo(map));
> >>>> +	}
> >>> Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is
> >>> counting non-sleeping iterations but here we are sleeping so
> >>> PVCALLS_FRONT_MAX_SPIN (5000) may take a while.
> >>>
> >>> In fact, what shouldn't this waiting be a function of MSG_DONTWAIT
> >> err, which it already is. But the question still stands (except for
> >> MSG_DONTWAIT).
> > The code (admittedly unintuitive) is busy-looping (non-sleeping) for
> > 5000 iterations *before* attempting to sleep. So in that regard, recvmsg
> > and sendmsg use PVCALLS_FRONT_MAX_SPIN in the same way: only for
> > non-sleeping iterations.
> >
> 
> OK.
> 
> Why not go directly into wait_event_interruptible()? I see you write in
> the commit message
> 
> If not enough data is available on the ring, rather than returning
> immediately or sleep-waiting, spin for up to 5000 cycles. This small
> optimization turns out to improve performance and latency significantly.
> 
> 
> Is this because of scheduling latency? I think this should be mentioned not just in the commit message but also as a comment in the code.

It tries to mitigate scheduling latencies on both ends (dom0 and domU)
when the ring buffer is the bottleneck (high bandwidth connections). But
to be honest with you, it's mostly beneficial in the sendmsg case,
because for recvmsg we also introduce a busy-wait in regular
circumstances, when no data is actually available. I confirmed this
statement with a quick iperf test. I'll remove the spin from recvmsg and
keep it in sendmsg.


> 
> (I also think it's not "not enough data" but rather "no data"?)

you are right
diff mbox

Patch

diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index d8ed280..b4ca569 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -96,6 +96,20 @@  static int pvcalls_front_write_todo(struct sock_mapping *map)
 	return size - pvcalls_queued(prod, cons, size);
 }
 
+static bool pvcalls_front_read_todo(struct sock_mapping *map)
+{
+	struct pvcalls_data_intf *intf = map->active.ring;
+	RING_IDX cons, prod;
+	int32_t error;
+
+	cons = intf->in_cons;
+	prod = intf->in_prod;
+	error = intf->in_error;
+	return (error != 0 ||
+		pvcalls_queued(prod, cons,
+			       XEN_FLEX_RING_SIZE(intf->ring_order))) != 0;
+}
+
 static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
 {
 	struct xenbus_device *dev = dev_id;
@@ -418,6 +432,98 @@  int pvcalls_front_sendmsg(struct socket *sock, struct msghdr *msg,
 	return tot_sent;
 }
 
+static int __read_ring(struct pvcalls_data_intf *intf,
+		       struct pvcalls_data *data,
+		       struct iov_iter *msg_iter,
+		       size_t len, int flags)
+{
+	RING_IDX cons, prod, size, masked_prod, masked_cons;
+	RING_IDX array_size = XEN_FLEX_RING_SIZE(intf->ring_order);
+	int32_t error;
+
+	cons = intf->in_cons;
+	prod = intf->in_prod;
+	error = intf->in_error;
+	/* get pointers before reading from the ring */
+	virt_rmb();
+	if (error < 0)
+		return error;
+
+	size = pvcalls_queued(prod, cons, array_size);
+	masked_prod = pvcalls_mask(prod, array_size);
+	masked_cons = pvcalls_mask(cons, array_size);
+
+	if (size == 0)
+		return 0;
+
+	if (len > size)
+		len = size;
+
+	if (masked_prod > masked_cons) {
+		copy_to_iter(data->in + masked_cons, len, msg_iter);
+	} else {
+		if (len > (array_size - masked_cons)) {
+			copy_to_iter(data->in + masked_cons,
+				     array_size - masked_cons, msg_iter);
+			copy_to_iter(data->in,
+				     len - (array_size - masked_cons),
+				     msg_iter);
+		} else {
+			copy_to_iter(data->in + masked_cons, len, msg_iter);
+		}
+	}
+	/* read data from the ring before increasing the index */
+	virt_mb();
+	if (!(flags & MSG_PEEK))
+		intf->in_cons += len;
+
+	return len;
+}
+
+int pvcalls_front_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
+		     int flags)
+{
+	struct pvcalls_bedata *bedata;
+	int ret = -EAGAIN;
+	struct sock_mapping *map;
+	int count = 0;
+
+	if (!pvcalls_front_dev)
+		return -ENOTCONN;
+	bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+	map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head);
+	if (!map)
+		return -ENOTSOCK;
+
+	if (flags & (MSG_CMSG_CLOEXEC|MSG_ERRQUEUE|MSG_OOB|MSG_TRUNC))
+		return -EOPNOTSUPP;
+
+	mutex_lock(&map->active.in_mutex);
+	if (len > XEN_FLEX_RING_SIZE(map->active.ring->ring_order))
+		len = XEN_FLEX_RING_SIZE(map->active.ring->ring_order);
+
+	while (!(flags & MSG_DONTWAIT) && !pvcalls_front_read_todo(map)) {
+		if (count < PVCALLS_FRONT_MAX_SPIN)
+			count++;
+		else
+			wait_event_interruptible(map->active.inflight_conn_req,
+						 pvcalls_front_read_todo(map));
+	}
+	ret = __read_ring(map->active.ring, &map->active.data,
+			  &msg->msg_iter, len, flags);
+
+	if (ret > 0)
+		notify_remote_via_irq(map->active.irq);
+	if (ret == 0)
+		ret = -EAGAIN;
+	if (ret == -ENOTCONN)
+		ret = 0;
+
+	mutex_unlock(&map->active.in_mutex);
+	return ret;
+}
+
 int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
 {
 	struct pvcalls_bedata *bedata;
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
index d937c24..de24041 100644
--- a/drivers/xen/pvcalls-front.h
+++ b/drivers/xen/pvcalls-front.h
@@ -16,5 +16,9 @@  int pvcalls_front_accept(struct socket *sock,
 int pvcalls_front_sendmsg(struct socket *sock,
 			  struct msghdr *msg,
 			  size_t len);
+int pvcalls_front_recvmsg(struct socket *sock,
+			  struct msghdr *msg,
+			  size_t len,
+			  int flags);
 
 #endif