Message ID | 1501017730-12797-9-git-send-email-sstabellini@kernel.org (mailing list archive) |
---|---|
State | New, archived |
Headers | show |
On 07/25/2017 05:22 PM, Stefano Stabellini wrote: > Implement recvmsg by copying data from the "in" ring. If not enough data > is available and the recvmsg call is blocking, then wait on the > inflight_conn_req waitqueue. Take the active socket in_mutex so that > only one function can access the ring at any given time. > > If not enough data is available on the ring, rather than returning > immediately or sleep-waiting, spin for up to 5000 cycles. This small > optimization turns out to improve performance and latency significantly. > > Signed-off-by: Stefano Stabellini <stefano@aporeto.com> > CC: boris.ostrovsky@oracle.com > CC: jgross@suse.com > --- > drivers/xen/pvcalls-front.c | 106 ++++++++++++++++++++++++++++++++++++++++++++ > drivers/xen/pvcalls-front.h | 4 ++ > 2 files changed, 110 insertions(+) > > diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c > index d8ed280..b4ca569 100644 > --- a/drivers/xen/pvcalls-front.c > +++ b/drivers/xen/pvcalls-front.c > @@ -96,6 +96,20 @@ static int pvcalls_front_write_todo(struct sock_mapping *map) > return size - pvcalls_queued(prod, cons, size); > } > > +static bool pvcalls_front_read_todo(struct sock_mapping *map) > +{ > + struct pvcalls_data_intf *intf = map->active.ring; > + RING_IDX cons, prod; > + int32_t error; > + > + cons = intf->in_cons; > + prod = intf->in_prod; > + error = intf->in_error; > + return (error != 0 || > + pvcalls_queued(prod, cons, > + XEN_FLEX_RING_SIZE(intf->ring_order))) != 0; > +} > + > static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id) > { > struct xenbus_device *dev = dev_id; > @@ -418,6 +432,98 @@ int pvcalls_front_sendmsg(struct socket *sock, struct msghdr *msg, > return tot_sent; > } > > +static int __read_ring(struct pvcalls_data_intf *intf, > + struct pvcalls_data *data, > + struct iov_iter *msg_iter, > + size_t len, int flags) > +{ > + RING_IDX cons, prod, size, masked_prod, masked_cons; > + RING_IDX array_size = XEN_FLEX_RING_SIZE(intf->ring_order); > + int32_t error; > + > + cons = intf->in_cons; > + prod = intf->in_prod; > + error = intf->in_error; > + /* get pointers before reading from the ring */ > + virt_rmb(); > + if (error < 0) > + return error; > + > + size = pvcalls_queued(prod, cons, array_size); > + masked_prod = pvcalls_mask(prod, array_size); > + masked_cons = pvcalls_mask(cons, array_size); > + > + if (size == 0) > + return 0; > + > + if (len > size) > + len = size; > + > + if (masked_prod > masked_cons) { > + copy_to_iter(data->in + masked_cons, len, msg_iter); > + } else { > + if (len > (array_size - masked_cons)) { > + copy_to_iter(data->in + masked_cons, > + array_size - masked_cons, msg_iter); > + copy_to_iter(data->in, > + len - (array_size - masked_cons), > + msg_iter); > + } else { > + copy_to_iter(data->in + masked_cons, len, msg_iter); > + } > + } > + /* read data from the ring before increasing the index */ > + virt_mb(); > + if (!(flags & MSG_PEEK)) > + intf->in_cons += len; > + > + return len; > +} > + > +int pvcalls_front_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, > + int flags) > +{ > + struct pvcalls_bedata *bedata; > + int ret = -EAGAIN; > + struct sock_mapping *map; > + int count = 0; > + > + if (!pvcalls_front_dev) > + return -ENOTCONN; > + bedata = dev_get_drvdata(&pvcalls_front_dev->dev); > + > + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head); > + if (!map) > + return -ENOTSOCK; > + > + if (flags & (MSG_CMSG_CLOEXEC|MSG_ERRQUEUE|MSG_OOB|MSG_TRUNC)) > + return -EOPNOTSUPP; > + > + mutex_lock(&map->active.in_mutex); > + if (len > XEN_FLEX_RING_SIZE(map->active.ring->ring_order)) > + len = XEN_FLEX_RING_SIZE(map->active.ring->ring_order); > + > + while (!(flags & MSG_DONTWAIT) && !pvcalls_front_read_todo(map)) { > + if (count < PVCALLS_FRONT_MAX_SPIN) > + count++; > + else > + wait_event_interruptible(map->active.inflight_conn_req, > + pvcalls_front_read_todo(map)); > + } Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is counting non-sleeping iterations but here we are sleeping so PVCALLS_FRONT_MAX_SPIN (5000) may take a while. In fact, what shouldn't this waiting be a function of MSG_DONTWAIT and/or socket's O_NONBLOCK? -boris > + ret = __read_ring(map->active.ring, &map->active.data, > + &msg->msg_iter, len, flags); > + > + if (ret > 0) > + notify_remote_via_irq(map->active.irq); > + if (ret == 0) > + ret = -EAGAIN; > + if (ret == -ENOTCONN) > + ret = 0; > + > + mutex_unlock(&map->active.in_mutex); > + return ret; > +} > + > int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int addr_len) > { > struct pvcalls_bedata *bedata; > diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h > index d937c24..de24041 100644 > --- a/drivers/xen/pvcalls-front.h > +++ b/drivers/xen/pvcalls-front.h > @@ -16,5 +16,9 @@ int pvcalls_front_accept(struct socket *sock, > int pvcalls_front_sendmsg(struct socket *sock, > struct msghdr *msg, > size_t len); > +int pvcalls_front_recvmsg(struct socket *sock, > + struct msghdr *msg, > + size_t len, > + int flags); > > #endif
>> + while (!(flags & MSG_DONTWAIT) && !pvcalls_front_read_todo(map)) { >> + if (count < PVCALLS_FRONT_MAX_SPIN) >> + count++; >> + else >> + wait_event_interruptible(map->active.inflight_conn_req, >> + pvcalls_front_read_todo(map)); >> + } > Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is > counting non-sleeping iterations but here we are sleeping so > PVCALLS_FRONT_MAX_SPIN (5000) may take a while. > > In fact, what shouldn't this waiting be a function of MSG_DONTWAIT err, which it already is. But the question still stands (except for MSG_DONTWAIT). -boris > and/or socket's O_NONBLOCK?
On Wed, 26 Jul 2017, Boris Ostrovsky wrote: > >> + count++; > >> + else > >> + wait_event_interruptible(map->active.inflight_conn_req, > >> + pvcalls_front_read_todo(map)); > >> + } > > Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is > > counting non-sleeping iterations but here we are sleeping so > > PVCALLS_FRONT_MAX_SPIN (5000) may take a while. > > > > In fact, what shouldn't this waiting be a function of MSG_DONTWAIT > > err, which it already is. But the question still stands (except for > MSG_DONTWAIT). The code (admittedly unintuitive) is busy-looping (non-sleeping) for 5000 iterations *before* attempting to sleep. So in that regard, recvmsg and sendmsg use PVCALLS_FRONT_MAX_SPIN in the same way: only for non-sleeping iterations.
On 07/26/2017 08:08 PM, Stefano Stabellini wrote: > On Wed, 26 Jul 2017, Boris Ostrovsky wrote: >>>> + count++; >>>> + else >>>> + wait_event_interruptible(map->active.inflight_conn_req, >>>> + pvcalls_front_read_todo(map)); >>>> + } >>> Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is >>> counting non-sleeping iterations but here we are sleeping so >>> PVCALLS_FRONT_MAX_SPIN (5000) may take a while. >>> >>> In fact, what shouldn't this waiting be a function of MSG_DONTWAIT >> err, which it already is. But the question still stands (except for >> MSG_DONTWAIT). > The code (admittedly unintuitive) is busy-looping (non-sleeping) for > 5000 iterations *before* attempting to sleep. So in that regard, recvmsg > and sendmsg use PVCALLS_FRONT_MAX_SPIN in the same way: only for > non-sleeping iterations. > OK. Why not go directly into wait_event_interruptible()? I see you write in the commit message If not enough data is available on the ring, rather than returning immediately or sleep-waiting, spin for up to 5000 cycles. This small optimization turns out to improve performance and latency significantly. Is this because of scheduling latency? I think this should be mentioned not just in the commit message but also as a comment in the code. (I also think it's not "not enough data" but rather "no data"?) -boris
On Thu, 27 Jul 2017, Boris Ostrovsky wrote: > On 07/26/2017 08:08 PM, Stefano Stabellini wrote: > > On Wed, 26 Jul 2017, Boris Ostrovsky wrote: > >>>> + count++; > >>>> + else > >>>> + wait_event_interruptible(map->active.inflight_conn_req, > >>>> + pvcalls_front_read_todo(map)); > >>>> + } > >>> Should we be using PVCALLS_FRONT_MAX_SPIN here? In sendmsg it is > >>> counting non-sleeping iterations but here we are sleeping so > >>> PVCALLS_FRONT_MAX_SPIN (5000) may take a while. > >>> > >>> In fact, what shouldn't this waiting be a function of MSG_DONTWAIT > >> err, which it already is. But the question still stands (except for > >> MSG_DONTWAIT). > > The code (admittedly unintuitive) is busy-looping (non-sleeping) for > > 5000 iterations *before* attempting to sleep. So in that regard, recvmsg > > and sendmsg use PVCALLS_FRONT_MAX_SPIN in the same way: only for > > non-sleeping iterations. > > > > OK. > > Why not go directly into wait_event_interruptible()? I see you write in > the commit message > > If not enough data is available on the ring, rather than returning > immediately or sleep-waiting, spin for up to 5000 cycles. This small > optimization turns out to improve performance and latency significantly. > > > Is this because of scheduling latency? I think this should be mentioned not just in the commit message but also as a comment in the code. It tries to mitigate scheduling latencies on both ends (dom0 and domU) when the ring buffer is the bottleneck (high bandwidth connections). But to be honest with you, it's mostly beneficial in the sendmsg case, because for recvmsg we also introduce a busy-wait in regular circumstances, when no data is actually available. I confirmed this statement with a quick iperf test. I'll remove the spin from recvmsg and keep it in sendmsg. > > (I also think it's not "not enough data" but rather "no data"?) you are right
diff --git a/drivers/xen/pvcalls-front.c b/drivers/xen/pvcalls-front.c index d8ed280..b4ca569 100644 --- a/drivers/xen/pvcalls-front.c +++ b/drivers/xen/pvcalls-front.c @@ -96,6 +96,20 @@ static int pvcalls_front_write_todo(struct sock_mapping *map) return size - pvcalls_queued(prod, cons, size); } +static bool pvcalls_front_read_todo(struct sock_mapping *map) +{ + struct pvcalls_data_intf *intf = map->active.ring; + RING_IDX cons, prod; + int32_t error; + + cons = intf->in_cons; + prod = intf->in_prod; + error = intf->in_error; + return (error != 0 || + pvcalls_queued(prod, cons, + XEN_FLEX_RING_SIZE(intf->ring_order))) != 0; +} + static irqreturn_t pvcalls_front_event_handler(int irq, void *dev_id) { struct xenbus_device *dev = dev_id; @@ -418,6 +432,98 @@ int pvcalls_front_sendmsg(struct socket *sock, struct msghdr *msg, return tot_sent; } +static int __read_ring(struct pvcalls_data_intf *intf, + struct pvcalls_data *data, + struct iov_iter *msg_iter, + size_t len, int flags) +{ + RING_IDX cons, prod, size, masked_prod, masked_cons; + RING_IDX array_size = XEN_FLEX_RING_SIZE(intf->ring_order); + int32_t error; + + cons = intf->in_cons; + prod = intf->in_prod; + error = intf->in_error; + /* get pointers before reading from the ring */ + virt_rmb(); + if (error < 0) + return error; + + size = pvcalls_queued(prod, cons, array_size); + masked_prod = pvcalls_mask(prod, array_size); + masked_cons = pvcalls_mask(cons, array_size); + + if (size == 0) + return 0; + + if (len > size) + len = size; + + if (masked_prod > masked_cons) { + copy_to_iter(data->in + masked_cons, len, msg_iter); + } else { + if (len > (array_size - masked_cons)) { + copy_to_iter(data->in + masked_cons, + array_size - masked_cons, msg_iter); + copy_to_iter(data->in, + len - (array_size - masked_cons), + msg_iter); + } else { + copy_to_iter(data->in + masked_cons, len, msg_iter); + } + } + /* read data from the ring before increasing the index */ + virt_mb(); + if (!(flags & MSG_PEEK)) + intf->in_cons += len; + + return len; +} + +int pvcalls_front_recvmsg(struct socket *sock, struct msghdr *msg, size_t len, + int flags) +{ + struct pvcalls_bedata *bedata; + int ret = -EAGAIN; + struct sock_mapping *map; + int count = 0; + + if (!pvcalls_front_dev) + return -ENOTCONN; + bedata = dev_get_drvdata(&pvcalls_front_dev->dev); + + map = (struct sock_mapping *) READ_ONCE(sock->sk->sk_send_head); + if (!map) + return -ENOTSOCK; + + if (flags & (MSG_CMSG_CLOEXEC|MSG_ERRQUEUE|MSG_OOB|MSG_TRUNC)) + return -EOPNOTSUPP; + + mutex_lock(&map->active.in_mutex); + if (len > XEN_FLEX_RING_SIZE(map->active.ring->ring_order)) + len = XEN_FLEX_RING_SIZE(map->active.ring->ring_order); + + while (!(flags & MSG_DONTWAIT) && !pvcalls_front_read_todo(map)) { + if (count < PVCALLS_FRONT_MAX_SPIN) + count++; + else + wait_event_interruptible(map->active.inflight_conn_req, + pvcalls_front_read_todo(map)); + } + ret = __read_ring(map->active.ring, &map->active.data, + &msg->msg_iter, len, flags); + + if (ret > 0) + notify_remote_via_irq(map->active.irq); + if (ret == 0) + ret = -EAGAIN; + if (ret == -ENOTCONN) + ret = 0; + + mutex_unlock(&map->active.in_mutex); + return ret; +} + int pvcalls_front_bind(struct socket *sock, struct sockaddr *addr, int addr_len) { struct pvcalls_bedata *bedata; diff --git a/drivers/xen/pvcalls-front.h b/drivers/xen/pvcalls-front.h index d937c24..de24041 100644 --- a/drivers/xen/pvcalls-front.h +++ b/drivers/xen/pvcalls-front.h @@ -16,5 +16,9 @@ int pvcalls_front_accept(struct socket *sock, int pvcalls_front_sendmsg(struct socket *sock, struct msghdr *msg, size_t len); +int pvcalls_front_recvmsg(struct socket *sock, + struct msghdr *msg, + size_t len, + int flags); #endif
Implement recvmsg by copying data from the "in" ring. If not enough data is available and the recvmsg call is blocking, then wait on the inflight_conn_req waitqueue. Take the active socket in_mutex so that only one function can access the ring at any given time. If not enough data is available on the ring, rather than returning immediately or sleep-waiting, spin for up to 5000 cycles. This small optimization turns out to improve performance and latency significantly. Signed-off-by: Stefano Stabellini <stefano@aporeto.com> CC: boris.ostrovsky@oracle.com CC: jgross@suse.com --- drivers/xen/pvcalls-front.c | 106 ++++++++++++++++++++++++++++++++++++++++++++ drivers/xen/pvcalls-front.h | 4 ++ 2 files changed, 110 insertions(+)