diff mbox

[v4,09/13] xen/pvcalls: implement sendmsg

Message ID 1505516440-11111-9-git-send-email-sstabellini@kernel.org (mailing list archive)
State New, archived
Headers show

Commit Message

Stefano Stabellini Sept. 15, 2017, 11 p.m. UTC
Send data to an active socket by copying data to the "out" ring. Take
the active socket out_mutex so that only one function can access the
ring at any given time.

If not enough room is available on the ring, rather than returning
immediately or sleep-waiting, spin for up to 5000 cycles. This small
optimization turns out to improve performance significantly.

Signed-off-by: Stefano Stabellini <stefano@aporeto.com>
CC: boris.ostrovsky@oracle.com
CC: jgross@suse.com
---
 drivers/xen/pvcalls-front.c | 119 ++++++++++++++++++++++++++++++++++++++++++++
 drivers/xen/pvcalls-front.h |   3 ++
 2 files changed, 122 insertions(+)

Comments

Boris Ostrovsky Sept. 22, 2017, 9:57 p.m. UTC | #1
> +static bool pvcalls_front_write_todo(struct sock_mapping *map)
> +{
> +	struct pvcalls_data_intf *intf = map->active.ring;
> +	RING_IDX cons, prod, size = XEN_FLEX_RING_SIZE(PVCALLS_RING_ORDER);
> +	int32_t error;
> +
> +	cons = intf->out_cons;
> +	prod = intf->out_prod;
> +	error = intf->out_error;
> +	if (error == -ENOTCONN)
> +		return false;
> +	if (error != 0)
> +		return true;

Just like below, error processing can be moved up.

> +	return !!(size - pvcalls_queued(prod, cons, size));
> +}
> +
>  static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
>  {
>  	struct xenbus_device *dev = dev_id;
> @@ -363,6 +380,108 @@ int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
>  	return ret;
>  }
>  
> +static int __write_ring(struct pvcalls_data_intf *intf,
> +			struct pvcalls_data *data,
> +			struct iov_iter *msg_iter,
> +			int len)
> +{
> +	RING_IDX cons, prod, size, masked_prod, masked_cons;
> +	RING_IDX array_size = XEN_FLEX_RING_SIZE(PVCALLS_RING_ORDER);
> +	int32_t error;
> +
> +	error = intf->out_error;
> +	if (error < 0)
> +		return error;
> +	cons = intf->out_cons;
> +	prod = intf->out_prod;
> +	/* read indexes before continuing */
> +	virt_mb();
> +
> +	size = pvcalls_queued(prod, cons, array_size);
> +	if (size >= array_size)
> +		return 0;


Is it possible to have size > array_size?


> +	if (len > array_size - size)
> +		len = array_size - size;
> +
> +	masked_prod = pvcalls_mask(prod, array_size);
> +	masked_cons = pvcalls_mask(cons, array_size);
> +
> +	if (masked_prod < masked_cons) {
> +		copy_from_iter(data->out + masked_prod, len, msg_iter);
> +	} else {
> +		if (len > array_size - masked_prod) {
> +			copy_from_iter(data->out + masked_prod,
> +				       array_size - masked_prod, msg_iter);
> +			copy_from_iter(data->out,
> +				       len - (array_size - masked_prod),
> +				       msg_iter);
> +		} else {
> +			copy_from_iter(data->out + masked_prod, len, msg_iter);
> +		}
> +	}
> +	/* write to ring before updating pointer */
> +	virt_wmb();
> +	intf->out_prod += len;
> +
> +	return len;


I know that you said you'd be changing len's type to int but now that I
am looking at it I wonder whether you could pass len as a 'size_t *' and
have this routine return error code (i.e. <=0).

OTOH, we'd be mixing up types again since RING_IDX is an unsigned int.

So I'll leave it to you (or anyone else reviewing this) to decide which
way is better.


> +}
> +
> +int pvcalls_front_sendmsg(struct socket *sock, struct msghdr *msg,
> +			  size_t len)

Also, the signature here looks suspicious --- you are trying to send
'size_t len' bytes but returning an int, which is how many bytes you've
actually sent. Right?


-boris
Stefano Stabellini Oct. 6, 2017, 9:43 p.m. UTC | #2
On Fri, 22 Sep 2017, Boris Ostrovsky wrote:
> > +static bool pvcalls_front_write_todo(struct sock_mapping *map)
> > +{
> > +	struct pvcalls_data_intf *intf = map->active.ring;
> > +	RING_IDX cons, prod, size = XEN_FLEX_RING_SIZE(PVCALLS_RING_ORDER);
> > +	int32_t error;
> > +
> > +	cons = intf->out_cons;
> > +	prod = intf->out_prod;
> > +	error = intf->out_error;
> > +	if (error == -ENOTCONN)
> > +		return false;
> > +	if (error != 0)
> > +		return true;
> 
> Just like below, error processing can be moved up.

OK


> > +	return !!(size - pvcalls_queued(prod, cons, size));
> > +}
> > +
> >  static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
> >  {
> >  	struct xenbus_device *dev = dev_id;
> > @@ -363,6 +380,108 @@ int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
> >  	return ret;
> >  }
> >  
> > +static int __write_ring(struct pvcalls_data_intf *intf,
> > +			struct pvcalls_data *data,
> > +			struct iov_iter *msg_iter,
> > +			int len)
> > +{
> > +	RING_IDX cons, prod, size, masked_prod, masked_cons;
> > +	RING_IDX array_size = XEN_FLEX_RING_SIZE(PVCALLS_RING_ORDER);
> > +	int32_t error;
> > +
> > +	error = intf->out_error;
> > +	if (error < 0)
> > +		return error;
> > +	cons = intf->out_cons;
> > +	prod = intf->out_prod;
> > +	/* read indexes before continuing */
> > +	virt_mb();
> > +
> > +	size = pvcalls_queued(prod, cons, array_size);
> > +	if (size >= array_size)
> > +		return 0;
> 
> 
> Is it possible to have size > array_size?

Yes, if somebody makes a mistake in writing to prod. Of course, it is
not valid. I guess I could return error instead of 0.


> > +	if (len > array_size - size)
> > +		len = array_size - size;
> > +
> > +	masked_prod = pvcalls_mask(prod, array_size);
> > +	masked_cons = pvcalls_mask(cons, array_size);
> > +
> > +	if (masked_prod < masked_cons) {
> > +		copy_from_iter(data->out + masked_prod, len, msg_iter);
> > +	} else {
> > +		if (len > array_size - masked_prod) {
> > +			copy_from_iter(data->out + masked_prod,
> > +				       array_size - masked_prod, msg_iter);
> > +			copy_from_iter(data->out,
> > +				       len - (array_size - masked_prod),
> > +				       msg_iter);
> > +		} else {
> > +			copy_from_iter(data->out + masked_prod, len, msg_iter);
> > +		}
> > +	}
> > +	/* write to ring before updating pointer */
> > +	virt_wmb();
> > +	intf->out_prod += len;
> > +
> > +	return len;
> 
> 
> I know that you said you'd be changing len's type to int but now that I
> am looking at it I wonder whether you could pass len as a 'size_t *' and
> have this routine return error code (i.e. <=0).
> 
> OTOH, we'd be mixing up types again since RING_IDX is an unsigned int.
> 
> So I'll leave it to you (or anyone else reviewing this) to decide which
> way is better.

see below

> > +}
> > +
> > +int pvcalls_front_sendmsg(struct socket *sock, struct msghdr *msg,
> > +			  size_t len)
> 
> Also, the signature here looks suspicious --- you are trying to send
> 'size_t len' bytes but returning an int, which is how many bytes you've
> actually sent. Right?

Yes, but it is OK because it is limited by the size of the array which
is far smaller than INT_MAX (the array size is 262144). This is also why
I would just keep len as int.
diff mbox

Patch

diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c
index 414eafd..2907e85 100644
--- a/drivers/xen/pvcalls-front.c
+++ b/drivers/xen/pvcalls-front.c
@@ -29,6 +29,7 @@ 
 #define PVCALLS_INVALID_ID UINT_MAX
 #define PVCALLS_RING_ORDER XENBUS_MAX_RING_GRANT_ORDER
 #define PVCALLS_NR_REQ_PER_RING __CONST_RING_SIZE(xen_pvcalls, XEN_PAGE_SIZE)
+#define PVCALLS_FRONT_MAX_SPIN 5000
 
 struct pvcalls_bedata {
 	struct xen_pvcalls_front_ring ring;
@@ -101,6 +102,22 @@  static inline int get_request(struct pvcalls_bedata *bedata, int *req_id)
 	return 0;
 }
 
+static bool pvcalls_front_write_todo(struct sock_mapping *map)
+{
+	struct pvcalls_data_intf *intf = map->active.ring;
+	RING_IDX cons, prod, size = XEN_FLEX_RING_SIZE(PVCALLS_RING_ORDER);
+	int32_t error;
+
+	cons = intf->out_cons;
+	prod = intf->out_prod;
+	error = intf->out_error;
+	if (error == -ENOTCONN)
+		return false;
+	if (error != 0)
+		return true;
+	return !!(size - pvcalls_queued(prod, cons, size));
+}
+
 static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id)
 {
 	struct xenbus_device *dev = dev_id;
@@ -363,6 +380,108 @@  int pvcalls_front_connect(struct socket *sock, struct sockaddr *addr,
 	return ret;
 }
 
+static int __write_ring(struct pvcalls_data_intf *intf,
+			struct pvcalls_data *data,
+			struct iov_iter *msg_iter,
+			int len)
+{
+	RING_IDX cons, prod, size, masked_prod, masked_cons;
+	RING_IDX array_size = XEN_FLEX_RING_SIZE(PVCALLS_RING_ORDER);
+	int32_t error;
+
+	error = intf->out_error;
+	if (error < 0)
+		return error;
+	cons = intf->out_cons;
+	prod = intf->out_prod;
+	/* read indexes before continuing */
+	virt_mb();
+
+	size = pvcalls_queued(prod, cons, array_size);
+	if (size >= array_size)
+		return 0;
+	if (len > array_size - size)
+		len = array_size - size;
+
+	masked_prod = pvcalls_mask(prod, array_size);
+	masked_cons = pvcalls_mask(cons, array_size);
+
+	if (masked_prod < masked_cons) {
+		copy_from_iter(data->out + masked_prod, len, msg_iter);
+	} else {
+		if (len > array_size - masked_prod) {
+			copy_from_iter(data->out + masked_prod,
+				       array_size - masked_prod, msg_iter);
+			copy_from_iter(data->out,
+				       len - (array_size - masked_prod),
+				       msg_iter);
+		} else {
+			copy_from_iter(data->out + masked_prod, len, msg_iter);
+		}
+	}
+	/* write to ring before updating pointer */
+	virt_wmb();
+	intf->out_prod += len;
+
+	return len;
+}
+
+int pvcalls_front_sendmsg(struct socket *sock, struct msghdr *msg,
+			  size_t len)
+{
+	struct pvcalls_bedata *bedata;
+	struct sock_mapping *map;
+	int sent, tot_sent = 0;
+	int count = 0, flags;
+
+	pvcalls_enter;
+	if (!pvcalls_front_dev) {
+		pvcalls_exit;
+		return -ENOTCONN;
+	}
+	bedata = dev_get_drvdata(&pvcalls_front_dev->dev);
+
+	map = (struct sock_mapping *) sock->sk->sk_send_head;
+	if (!map) {
+		pvcalls_exit;
+		return -ENOTSOCK;
+	}
+
+	flags = msg->msg_flags;
+	if (flags & (MSG_CONFIRM|MSG_DONTROUTE|MSG_EOR|MSG_OOB)) {
+		pvcalls_exit;
+		return -EOPNOTSUPP;
+	}
+
+	mutex_lock(&map->active.out_mutex);
+	if ((flags & MSG_DONTWAIT) && !pvcalls_front_write_todo(map)) {
+		mutex_unlock(&map->active.out_mutex);
+		pvcalls_exit;
+		return -EAGAIN;
+	}
+	if (len > INT_MAX)
+		len = INT_MAX;
+
+again:
+	count++;
+	sent = __write_ring(map->active.ring,
+			    &map->active.data, &msg->msg_iter,
+			    len);
+	if (sent > 0) {
+		len -= sent;
+		tot_sent += sent;
+		notify_remote_via_irq(map->active.irq);
+	}
+	if (sent >= 0 && len > 0 && count < PVCALLS_FRONT_MAX_SPIN)
+		goto again;
+	if (sent < 0)
+		tot_sent = sent;
+
+	mutex_unlock(&map->active.out_mutex);
+	pvcalls_exit;
+	return tot_sent;
+}
+
 int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
 {
 	struct pvcalls_bedata *bedata;
diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h
index ab4f1da..d937c24 100644
--- a/drivers/xen/pvcalls-front.h
+++ b/drivers/xen/pvcalls-front.h
@@ -13,5 +13,8 @@  int pvcalls_front_bind(struct socket *sock,
 int pvcalls_front_accept(struct socket *sock,
 			 struct socket *newsock,
 			 int flags);
+int pvcalls_front_sendmsg(struct socket *sock,
+			  struct msghdr *msg,
+			  size_t len);
 
 #endif