diff mbox series

[v3,5/5] vhost: add an RPMsg API

Message ID 20200527180541.5570-6-guennadi.liakhovetski@linux.intel.com (mailing list archive)
State Superseded
Headers show
Series Add a vhost RPMsg API | expand

Commit Message

Guennadi Liakhovetski May 27, 2020, 6:05 p.m. UTC
Linux supports running the RPMsg protocol over the VirtIO transport
protocol, but currently there is only support for VirtIO clients and
no support for a VirtIO server. This patch adds a vhost-based RPMsg
server implementation.

Signed-off-by: Guennadi Liakhovetski <guennadi.liakhovetski@linux.intel.com>
---
 drivers/vhost/Kconfig       |   7 +
 drivers/vhost/Makefile      |   3 +
 drivers/vhost/rpmsg.c       | 382 ++++++++++++++++++++++++++++++++++++++++++++
 drivers/vhost/vhost_rpmsg.h |  74 +++++++++
 4 files changed, 466 insertions(+)
 create mode 100644 drivers/vhost/rpmsg.c
 create mode 100644 drivers/vhost/vhost_rpmsg.h

Comments

Mathieu Poirier May 28, 2020, 7:26 p.m. UTC | #1
On Wed, 27 May 2020 at 12:05, Guennadi Liakhovetski
<guennadi.liakhovetski@linux.intel.com> wrote:
>
> Linux supports running the RPMsg protocol over the VirtIO transport
> protocol, but currently there is only support for VirtIO clients and
> no support for a VirtIO server. This patch adds a vhost-based RPMsg
> server implementation.
>
> Signed-off-by: Guennadi Liakhovetski <guennadi.liakhovetski@linux.intel.com>
> ---
>  drivers/vhost/Kconfig       |   7 +
>  drivers/vhost/Makefile      |   3 +
>  drivers/vhost/rpmsg.c       | 382 ++++++++++++++++++++++++++++++++++++++++++++
>  drivers/vhost/vhost_rpmsg.h |  74 +++++++++
>  4 files changed, 466 insertions(+)
>  create mode 100644 drivers/vhost/rpmsg.c
>  create mode 100644 drivers/vhost/vhost_rpmsg.h
>
> diff --git a/drivers/vhost/Kconfig b/drivers/vhost/Kconfig
> index 2c75d16..8b91f3e 100644
> --- a/drivers/vhost/Kconfig
> +++ b/drivers/vhost/Kconfig
> @@ -38,6 +38,13 @@ config VHOST_NET
>           To compile this driver as a module, choose M here: the module will
>           be called vhost_net.
>
> +config VHOST_RPMSG
> +       tristate
> +       depends on VHOST
> +       help
> +         Vhost RPMsg API allows vhost drivers to communicate with VirtIO
> +         drivers, using the RPMsg over VirtIO protocol.
> +
>  config VHOST_SCSI
>         tristate "VHOST_SCSI TCM fabric driver"
>         depends on TARGET_CORE && EVENTFD
> diff --git a/drivers/vhost/Makefile b/drivers/vhost/Makefile
> index f3e1897..9cf459d 100644
> --- a/drivers/vhost/Makefile
> +++ b/drivers/vhost/Makefile
> @@ -2,6 +2,9 @@
>  obj-$(CONFIG_VHOST_NET) += vhost_net.o
>  vhost_net-y := net.o
>
> +obj-$(CONFIG_VHOST_RPMSG) += vhost_rpmsg.o
> +vhost_rpmsg-y := rpmsg.o
> +
>  obj-$(CONFIG_VHOST_SCSI) += vhost_scsi.o
>  vhost_scsi-y := scsi.o
>
> diff --git a/drivers/vhost/rpmsg.c b/drivers/vhost/rpmsg.c
> new file mode 100644
> index 00000000..ea77e1f
> --- /dev/null
> +++ b/drivers/vhost/rpmsg.c
> @@ -0,0 +1,382 @@
> +// SPDX-License-Identifier: GPL-2.0-only
> +/*
> + * Copyright(c) 2020 Intel Corporation. All rights reserved.
> + *
> + * Author: Guennadi Liakhovetski <guennadi.liakhovetski@linux.intel.com>
> + *
> + * Vhost RPMsg VirtIO interface. It provides a set of functions to match the
> + * guest side RPMsg VirtIO API, provided by drivers/rpmsg/virtio_rpmsg_bus.c
> + * These functions handle creation of 2 virtual queues, handling of endpoint
> + * addresses, sending a name-space announcement to the guest as well as any
> + * user messages. This API can be used by any vhost driver to handle RPMsg
> + * specific processing.
> + * Specific vhost drivers, using this API will use their own VirtIO device
> + * IDs, that should then also be added to the ID table in virtio_rpmsg_bus.c
> + */
> +

Thank you for adding that.

Acked-by: Mathieu Poirier <mathieu.poirier@linaro.org>

> +#include <linux/compat.h>
> +#include <linux/file.h>
> +#include <linux/miscdevice.h>
> +#include <linux/module.h>
> +#include <linux/mutex.h>
> +#include <linux/vhost.h>
> +#include <linux/virtio_rpmsg.h>
> +#include <uapi/linux/rpmsg.h>
> +
> +#include "vhost.h"
> +#include "vhost_rpmsg.h"
> +
> +/*
> + * All virtio-rpmsg virtual queue kicks always come with just one buffer -
> + * either input or output
> + */
> +static int vhost_rpmsg_get_single(struct vhost_virtqueue *vq)
> +{
> +       struct vhost_rpmsg *vr = container_of(vq->dev, struct vhost_rpmsg, dev);
> +       unsigned int out, in;
> +       int head = vhost_get_vq_desc(vq, vq->iov, ARRAY_SIZE(vq->iov),
> +                                    &out, &in, NULL, NULL);
> +       if (head < 0) {
> +               vq_err(vq, "%s(): error %d getting buffer\n",
> +                      __func__, head);
> +               return head;
> +       }
> +
> +       /* Nothing new? */
> +       if (head == vq->num)
> +               return head;
> +
> +       if (vq == &vr->vq[VIRTIO_RPMSG_RESPONSE] && (out || in != 1)) {
> +               vq_err(vq,
> +                      "%s(): invalid %d input and %d output in response queue\n",
> +                      __func__, in, out);
> +               goto return_buf;
> +       }
> +
> +       if (vq == &vr->vq[VIRTIO_RPMSG_REQUEST] && (in || out != 1)) {
> +               vq_err(vq,
> +                      "%s(): invalid %d input and %d output in request queue\n",
> +                      __func__, in, out);
> +               goto return_buf;
> +       }
> +
> +       return head;
> +
> +return_buf:
> +       /*
> +        * FIXME: might need to return the buffer using vhost_add_used()
> +        * or vhost_discard_vq_desc(). vhost_discard_vq_desc() is
> +        * described as "being useful for error handling," but it makes
> +        * the thus discarded buffers "unseen," so next time we look we
> +        * retrieve them again?
> +        */
> +       return -EINVAL;
> +}
> +
> +static const struct vhost_rpmsg_ept *vhost_rpmsg_ept_find(
> +                                       struct vhost_rpmsg *vr, int addr)
> +{
> +       unsigned int i;
> +
> +       for (i = 0; i < vr->n_epts; i++)
> +               if (vr->ept[i].addr == addr)
> +                       return vr->ept + i;
> +
> +       return NULL;
> +}
> +
> +/*
> + * if len < 0, then for reading a request, the complete virtual queue buffer
> + * size is prepared, for sending a response, the length in the iterator is used
> + */
> +int vhost_rpmsg_start_lock(struct vhost_rpmsg *vr,
> +                          struct vhost_rpmsg_iter *iter,
> +                          unsigned int qid, ssize_t len)
> +       __acquires(vq->mutex)
> +{
> +       struct vhost_virtqueue *vq = vr->vq + qid;
> +       size_t tmp;
> +
> +       if (qid >= VIRTIO_RPMSG_NUM_OF_VQS)
> +               return -EINVAL;
> +
> +       iter->vq = vq;
> +
> +       mutex_lock(&vq->mutex);
> +       vhost_disable_notify(&vr->dev, vq);
> +
> +       iter->head = vhost_rpmsg_get_single(vq);
> +       if (iter->head == vq->num)
> +               iter->head = -EAGAIN;
> +
> +       if (iter->head < 0)
> +               goto unlock;
> +
> +       tmp = vq->iov[0].iov_len;
> +       if (tmp < sizeof(iter->rhdr)) {
> +               vq_err(vq, "%s(): size %zu too small\n", __func__, tmp);
> +               iter->head = -ENOBUFS;
> +               goto return_buf;
> +       }
> +
> +       switch (qid) {
> +       case VIRTIO_RPMSG_REQUEST:
> +               if (len < 0) {
> +                       len = tmp - sizeof(iter->rhdr);
> +               } else if (tmp < sizeof(iter->rhdr) + len) {
> +                       iter->head = -ENOBUFS;
> +                       goto return_buf;
> +               }
> +
> +               /* len is now the size of the payload */
> +               iov_iter_init(&iter->iov_iter, WRITE,
> +                             vq->iov, 1, sizeof(iter->rhdr) + len);
> +
> +               /* Read the RPMSG header with endpoint addresses */
> +               tmp = copy_from_iter(&iter->rhdr, sizeof(iter->rhdr),
> +                                    &iter->iov_iter);
> +               if (tmp != sizeof(iter->rhdr)) {
> +                       vq_err(vq, "%s(): got %zu instead of %zu\n", __func__,
> +                              tmp, sizeof(iter->rhdr));
> +                       iter->head = -EIO;
> +                       goto return_buf;
> +               }
> +
> +               iter->ept = vhost_rpmsg_ept_find(vr, iter->rhdr.dst);
> +               if (!iter->ept) {
> +                       vq_err(vq, "%s(): no endpoint with address %d\n",
> +                              __func__, iter->rhdr.dst);
> +                       iter->head = -ENOENT;
> +                       goto return_buf;
> +               }
> +
> +               /* Let the endpoint read the payload */
> +               if (iter->ept->read) {
> +                       ssize_t ret = iter->ept->read(vr, iter);
> +
> +                       if (ret < 0) {
> +                               iter->head = ret;
> +                               goto return_buf;
> +                       }
> +
> +                       iter->rhdr.len = ret;
> +               } else {
> +                       iter->rhdr.len = 0;
> +               }
> +
> +               /* Prepare for the response phase */
> +               iter->rhdr.dst = iter->rhdr.src;
> +               iter->rhdr.src = iter->ept->addr;
> +
> +               break;
> +       case VIRTIO_RPMSG_RESPONSE:
> +               if (!iter->ept && iter->rhdr.dst != RPMSG_NS_ADDR) {
> +                       /*
> +                        * Usually the iterator is configured when processing a
> +                        * message on the request queue, but it's also possible
> +                        * to send a message on the response queue without a
> +                        * preceding request, in that case the iterator must
> +                        * contain source and destination addresses.
> +                        */
> +                       iter->ept = vhost_rpmsg_ept_find(vr, iter->rhdr.src);
> +                       if (!iter->ept) {
> +                               iter->head = -ENOENT;
> +                               goto return_buf;
> +                       }
> +               }
> +
> +               if (len < 0) {
> +                       len = tmp - sizeof(iter->rhdr);
> +               } else if (tmp < sizeof(iter->rhdr) + len) {
> +                       iter->head = -ENOBUFS;
> +                       goto return_buf;
> +               } else {
> +                       iter->rhdr.len = len;
> +               }
> +
> +               /* len is now the size of the payload */
> +               iov_iter_init(&iter->iov_iter, READ,
> +                             vq->iov, 1, sizeof(iter->rhdr) + len);
> +
> +               /* Write the RPMSG header with endpoint addresses */
> +               tmp = copy_to_iter(&iter->rhdr, sizeof(iter->rhdr),
> +                                  &iter->iov_iter);
> +               if (tmp != sizeof(iter->rhdr)) {
> +                       iter->head = -EIO;
> +                       goto return_buf;
> +               }
> +
> +               /* Let the endpoint write the payload */
> +               if (iter->ept && iter->ept->write) {
> +                       ssize_t ret = iter->ept->write(vr, iter);
> +
> +                       if (ret < 0) {
> +                               iter->head = ret;
> +                               goto return_buf;
> +                       }
> +               }
> +
> +               break;
> +       }
> +
> +       return 0;
> +
> +return_buf:
> +       /*
> +        * FIXME: vhost_discard_vq_desc() or vhost_add_used(), see comment in
> +        * vhost_rpmsg_get_single()
> +        */
> +unlock:
> +       vhost_enable_notify(&vr->dev, vq);
> +       mutex_unlock(&vq->mutex);
> +
> +       return iter->head;
> +}
> +EXPORT_SYMBOL_GPL(vhost_rpmsg_start_lock);
> +
> +size_t vhost_rpmsg_copy(struct vhost_rpmsg *vr, struct vhost_rpmsg_iter *iter,
> +                       void *data, size_t size)
> +{
> +       /*
> +        * We could check for excess data, but copy_{to,from}_iter() don't do
> +        * that either
> +        */
> +       if (iter->vq == vr->vq + VIRTIO_RPMSG_RESPONSE)
> +               return copy_to_iter(data, size, &iter->iov_iter);
> +
> +       return copy_from_iter(data, size, &iter->iov_iter);
> +}
> +EXPORT_SYMBOL_GPL(vhost_rpmsg_copy);
> +
> +int vhost_rpmsg_finish_unlock(struct vhost_rpmsg *vr,
> +                             struct vhost_rpmsg_iter *iter)
> +       __releases(vq->mutex)
> +{
> +       if (iter->head >= 0)
> +               vhost_add_used_and_signal(iter->vq->dev, iter->vq, iter->head,
> +                                         iter->rhdr.len + sizeof(iter->rhdr));
> +
> +       vhost_enable_notify(&vr->dev, iter->vq);
> +       mutex_unlock(&iter->vq->mutex);
> +
> +       return iter->head;
> +}
> +EXPORT_SYMBOL_GPL(vhost_rpmsg_finish_unlock);
> +
> +/*
> + * Return false to terminate the external loop only if we fail to obtain either
> + * a request or a response buffer
> + */
> +static bool handle_rpmsg_req_single(struct vhost_rpmsg *vr,
> +                                   struct vhost_virtqueue *vq)
> +{
> +       struct vhost_rpmsg_iter iter;
> +       int ret = vhost_rpmsg_start_lock(vr, &iter, VIRTIO_RPMSG_REQUEST,
> +                                        -EINVAL);
> +       if (!ret)
> +               ret = vhost_rpmsg_finish_unlock(vr, &iter);
> +       if (ret < 0) {
> +               if (ret != -EAGAIN)
> +                       vq_err(vq, "%s(): RPMSG processing failed %d\n",
> +                              __func__, ret);
> +               return false;
> +       }
> +
> +       if (!iter.ept->write)
> +               return true;
> +
> +       ret = vhost_rpmsg_start_lock(vr, &iter, VIRTIO_RPMSG_RESPONSE,
> +                                    -EINVAL);
> +       if (!ret)
> +               ret = vhost_rpmsg_finish_unlock(vr, &iter);
> +       if (ret < 0) {
> +               vq_err(vq, "%s(): RPMSG finalising failed %d\n", __func__, ret);
> +               return false;
> +       }
> +
> +       return true;
> +}
> +
> +static void handle_rpmsg_req_kick(struct vhost_work *work)
> +{
> +       struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
> +                                                 poll.work);
> +       struct vhost_rpmsg *vr = container_of(vq->dev, struct vhost_rpmsg, dev);
> +
> +       while (handle_rpmsg_req_single(vr, vq))
> +               ;
> +}
> +
> +/*
> + * initialise two virtqueues with an array of endpoints,
> + * request and response callbacks
> + */
> +void vhost_rpmsg_init(struct vhost_rpmsg *vr, const struct vhost_rpmsg_ept *ept,
> +                     unsigned int n_epts)
> +{
> +       unsigned int i;
> +
> +       for (i = 0; i < ARRAY_SIZE(vr->vq); i++)
> +               vr->vq_p[i] = &vr->vq[i];
> +
> +       /* vq[0]: host -> guest, vq[1]: host <- guest */
> +       vr->vq[VIRTIO_RPMSG_REQUEST].handle_kick = handle_rpmsg_req_kick;
> +       vr->vq[VIRTIO_RPMSG_RESPONSE].handle_kick = NULL;
> +
> +       vr->ept = ept;
> +       vr->n_epts = n_epts;
> +
> +       vhost_dev_init(&vr->dev, vr->vq_p, VIRTIO_RPMSG_NUM_OF_VQS,
> +                      UIO_MAXIOV, 0, 0, NULL);
> +}
> +EXPORT_SYMBOL_GPL(vhost_rpmsg_init);
> +
> +void vhost_rpmsg_destroy(struct vhost_rpmsg *vr)
> +{
> +       if (vhost_dev_has_owner(&vr->dev))
> +               vhost_poll_flush(&vr->vq[VIRTIO_RPMSG_REQUEST].poll);
> +
> +       vhost_dev_cleanup(&vr->dev);
> +}
> +EXPORT_SYMBOL_GPL(vhost_rpmsg_destroy);
> +
> +/* send namespace */
> +int vhost_rpmsg_ns_announce(struct vhost_rpmsg *vr, const char *name,
> +                           unsigned int src)
> +{
> +       struct vhost_rpmsg_iter iter = {
> +               .rhdr = {
> +                       .src = 0,
> +                       .dst = RPMSG_NS_ADDR,
> +                       .flags = RPMSG_NS_CREATE, /* rpmsg_recv_single() */
> +               },
> +       };
> +       struct rpmsg_ns_msg ns = {
> +               .addr = src,
> +               .flags = RPMSG_NS_CREATE, /* for rpmsg_ns_cb() */
> +       };
> +       int ret = vhost_rpmsg_start_lock(vr, &iter, VIRTIO_RPMSG_RESPONSE,
> +                                        sizeof(ns));
> +
> +       if (ret < 0)
> +               return ret;
> +
> +       strlcpy(ns.name, name, sizeof(ns.name));
> +
> +       ret = vhost_rpmsg_copy(vr, &iter, &ns, sizeof(ns));
> +       if (ret != sizeof(ns))
> +               vq_err(iter.vq, "%s(): added %d instead of %zu bytes\n",
> +                      __func__, ret, sizeof(ns));
> +
> +       ret = vhost_rpmsg_finish_unlock(vr, &iter);
> +       if (ret < 0)
> +               vq_err(iter.vq, "%s(): namespace announcement failed: %d\n",
> +                      __func__, ret);
> +
> +       return ret;
> +}
> +EXPORT_SYMBOL_GPL(vhost_rpmsg_ns_announce);
> +
> +MODULE_LICENSE("GPL v2");
> +MODULE_AUTHOR("Intel, Inc.");
> +MODULE_DESCRIPTION("Vhost RPMsg API");
> diff --git a/drivers/vhost/vhost_rpmsg.h b/drivers/vhost/vhost_rpmsg.h
> new file mode 100644
> index 00000000..a3d0dda
> --- /dev/null
> +++ b/drivers/vhost/vhost_rpmsg.h
> @@ -0,0 +1,74 @@
> +/* SPDX-License-Identifier: GPL-2.0 */
> +/*
> + * Copyright(c) 2020 Intel Corporation. All rights reserved.
> + *
> + * Author: Guennadi Liakhovetski <guennadi.liakhovetski@linux.intel.com>
> + */
> +
> +#ifndef VHOST_RPMSG_H
> +#define VHOST_RPMSG_H
> +
> +#include <linux/uio.h>
> +#include <linux/virtio_rpmsg.h>
> +
> +#include "vhost.h"
> +
> +/* RPMsg uses two VirtQueues: one for each direction */
> +enum {
> +       VIRTIO_RPMSG_RESPONSE,  /* RPMsg response (host->guest) buffers */
> +       VIRTIO_RPMSG_REQUEST,   /* RPMsg request (guest->host) buffers */
> +       /* Keep last */
> +       VIRTIO_RPMSG_NUM_OF_VQS,
> +};
> +
> +struct vhost_rpmsg_ept;
> +
> +struct vhost_rpmsg_iter {
> +       struct iov_iter iov_iter;
> +       struct rpmsg_hdr rhdr;
> +       struct vhost_virtqueue *vq;
> +       const struct vhost_rpmsg_ept *ept;
> +       int head;
> +       void *priv;
> +};
> +
> +struct vhost_rpmsg {
> +       struct vhost_dev dev;
> +       struct vhost_virtqueue vq[VIRTIO_RPMSG_NUM_OF_VQS];
> +       struct vhost_virtqueue *vq_p[VIRTIO_RPMSG_NUM_OF_VQS];
> +       const struct vhost_rpmsg_ept *ept;
> +       unsigned int n_epts;
> +};
> +
> +struct vhost_rpmsg_ept {
> +       ssize_t (*read)(struct vhost_rpmsg *, struct vhost_rpmsg_iter *);
> +       ssize_t (*write)(struct vhost_rpmsg *, struct vhost_rpmsg_iter *);
> +       int addr;
> +};
> +
> +static inline size_t vhost_rpmsg_iter_len(const struct vhost_rpmsg_iter *iter)
> +{
> +       return iter->rhdr.len;
> +}
> +
> +#define VHOST_RPMSG_ITER(_src, _dst) { \
> +       .rhdr = {                       \
> +                       .src = _src,    \
> +                       .dst = _dst,    \
> +               },                      \
> +       }
> +
> +void vhost_rpmsg_init(struct vhost_rpmsg *vr, const struct vhost_rpmsg_ept *ept,
> +                     unsigned int n_epts);
> +void vhost_rpmsg_destroy(struct vhost_rpmsg *vr);
> +int vhost_rpmsg_ns_announce(struct vhost_rpmsg *vr, const char *name,
> +                           unsigned int src);
> +int vhost_rpmsg_start_lock(struct vhost_rpmsg *vr,
> +                          struct vhost_rpmsg_iter *iter,
> +                          unsigned int qid, ssize_t len);
> +size_t vhost_rpmsg_copy(struct vhost_rpmsg *vr, struct vhost_rpmsg_iter *iter,
> +                       void *data, size_t size);
> +int vhost_rpmsg_finish_unlock(struct vhost_rpmsg *vr,
> +                             struct vhost_rpmsg_iter *iter);
> +
> +#endif
> --
> 1.9.3
>
Vincent Whitchurch June 17, 2020, 7:17 p.m. UTC | #2
On Wed, May 27, 2020 at 08:05:41PM +0200, Guennadi Liakhovetski wrote:
> Linux supports running the RPMsg protocol over the VirtIO transport
> protocol, but currently there is only support for VirtIO clients and
> no support for a VirtIO server. This patch adds a vhost-based RPMsg
> server implementation.

This looks really useful, but why is it implemented as an API and not as
a real vhost driver which implements an rpmsg bus?  If you implement it
as a vhost driver which implements rpmsg_device_ops and
rpmsg_endpoint_ops, then wouldn't you be able to implement your
vhost-sof driver using the normal rpmsg APIs?

I tried quickly hooking up this code to such a vhost driver and I was
able to communicate between host and guest systems with both
rpmsg-client-sample and rpmsg-char which almost no modifications to
those drivers.
Guennadi Liakhovetski June 18, 2020, 9:03 a.m. UTC | #3
Hi Vincent,

On Wed, Jun 17, 2020 at 09:17:42PM +0200, Vincent Whitchurch wrote:
> On Wed, May 27, 2020 at 08:05:41PM +0200, Guennadi Liakhovetski wrote:
> > Linux supports running the RPMsg protocol over the VirtIO transport
> > protocol, but currently there is only support for VirtIO clients and
> > no support for a VirtIO server. This patch adds a vhost-based RPMsg
> > server implementation.
> 
> This looks really useful, but why is it implemented as an API and not as
> a real vhost driver which implements an rpmsg bus?  If you implement it
> as a vhost driver which implements rpmsg_device_ops and
> rpmsg_endpoint_ops, then wouldn't you be able to implement your
> vhost-sof driver using the normal rpmsg APIs?

Sorry, not sure what you mean by the "normal rpmsg API?" Do you mean the 
VirtIO RPMsg API? But that's the opposite side of the link - that's the 
guest side in the VM case and the Linux side in the remoteproc case. What 
this API is adding is a vhost RPMsg API. The kernel vhost framework 
itself is essentially a library of functions. Kernel vhost drivers simply 
create a misc device and use the vhost functions for some common 
functionality. This RPMsg vhost API stays in the same concept and provides 
further functions for RPMsg specific vhost operation.

> I tried quickly hooking up this code to such a vhost driver and I was
> able to communicate between host and guest systems with both
> rpmsg-client-sample and rpmsg-char which almost no modifications to
> those drivers.

You mean you used this patch to create RPMsg vhost drivers? Without 
creating a vhost RPMsg bus? Nice, glad to hear that!

Thanks
Guennadi
Vincent Whitchurch June 18, 2020, 9:33 a.m. UTC | #4
On Thu, Jun 18, 2020 at 11:03:42AM +0200, Guennadi Liakhovetski wrote:
> On Wed, Jun 17, 2020 at 09:17:42PM +0200, Vincent Whitchurch wrote:
> > On Wed, May 27, 2020 at 08:05:41PM +0200, Guennadi Liakhovetski wrote:
> > > Linux supports running the RPMsg protocol over the VirtIO transport
> > > protocol, but currently there is only support for VirtIO clients and
> > > no support for a VirtIO server. This patch adds a vhost-based RPMsg
> > > server implementation.
> > 
> > This looks really useful, but why is it implemented as an API and not as
> > a real vhost driver which implements an rpmsg bus?  If you implement it
> > as a vhost driver which implements rpmsg_device_ops and
> > rpmsg_endpoint_ops, then wouldn't you be able to implement your
> > vhost-sof driver using the normal rpmsg APIs?
> 
> Sorry, not sure what you mean by the "normal rpmsg API?" Do you mean the 
> VirtIO RPMsg API? But that's the opposite side of the link - that's the 
> guest side in the VM case and the Linux side in the remoteproc case. What 
> this API is adding is a vhost RPMsg API. The kernel vhost framework 
> itself is essentially a library of functions. Kernel vhost drivers simply 
> create a misc device and use the vhost functions for some common 
> functionality. This RPMsg vhost API stays in the same concept and provides 
> further functions for RPMsg specific vhost operation.

By the "normal rpmsg API" I mean register_rpmsg_driver(), rpmsg_send(),
etc.  That API is not tied to virtio in any way and there are other
non-virtio backends for this API in the tree.  So it seems quite natural
to implement a vhost backend for this API so that both sides of the link
can use the same API but different backends, instead of forcing them to
use of different APIs.

> > I tried quickly hooking up this code to such a vhost driver and I was
> > able to communicate between host and guest systems with both
> > rpmsg-client-sample and rpmsg-char which almost no modifications to
> > those drivers.
> 
> You mean you used this patch to create RPMsg vhost drivers? Without 
> creating a vhost RPMsg bus? Nice, glad to hear that!

Not quite, I hacked togther a single generic vhost-rpmsg-bus driver
which just wraps the API in this patch and implements a basic
rpmsg_device_ops and rpmsg_endpoint_ops.  Then with the following
patches and no other vhost-specific API use, I was able to load and use
the same rpmsg-char and rpmsg-client-sample drivers on both host and
guest kernels.

Userspace sets up the vhost using vhost-rpmsg-bus' misc device and
triggers creation of an rpdev which leads to a probe of the (for
example) rpmsg-client-sample driver on the host (server), which, in
turn, via NS announcement, triggers a creation of an rpdev and a probe
of the rpmsg-client-sample driver on the guest (client).

diff --git a/drivers/rpmsg/rpmsg_char.c b/drivers/rpmsg/rpmsg_char.c
index a76b963a7e5..7a03978d002 100644
--- a/drivers/rpmsg/rpmsg_char.c
+++ b/drivers/rpmsg/rpmsg_char.c
@@ -104,6 +104,11 @@ static int rpmsg_ept_cb(struct rpmsg_device *rpdev, void *buf, int len,
 	struct rpmsg_eptdev *eptdev = priv;
 	struct sk_buff *skb;
 
+	if (rpdev->dst == RPMSG_ADDR_ANY) {
+		printk("%s: got client address %#x from first rx!\n", __func__, addr);
+		rpdev->dst = addr;
+	}
+
 	skb = alloc_skb(len, GFP_ATOMIC);
 	if (!skb)
 		return -ENOMEM;
@@ -235,6 +240,12 @@ static ssize_t rpmsg_eptdev_write(struct file *filp, const char __user *buf,
 		goto unlock_eptdev;
 	}
 
+	if (eptdev->rpdev->dst == RPMSG_ADDR_ANY) {
+		ret = -EPIPE;
+		WARN(1, "Cannot write first on server, must wait for client!\n");
+		goto unlock_eptdev;
+	}
+
 	if (filp->f_flags & O_NONBLOCK)
 		ret = rpmsg_trysend(eptdev->ept, kbuf, len);
 	else
diff --git a/samples/rpmsg/rpmsg_client_sample.c b/samples/rpmsg/rpmsg_client_sample.c
index f161dfd3e70..5d8ca84dce0 100644
--- a/samples/rpmsg/rpmsg_client_sample.c
+++ b/samples/rpmsg/rpmsg_client_sample.c
@@ -46,6 +46,9 @@ static int rpmsg_sample_cb(struct rpmsg_device *rpdev, void *data, int len,
 		return 0;
 	}
 
+	if (rpdev->dst == RPMSG_ADDR_ANY)
+		rpdev->dst = src;
+
 	/* send a new message now */
 	ret = rpmsg_send(rpdev->ept, MSG, strlen(MSG));
 	if (ret)
@@ -68,11 +71,13 @@ static int rpmsg_sample_probe(struct rpmsg_device *rpdev)
 
 	dev_set_drvdata(&rpdev->dev, idata);
 
-	/* send a message to our remote processor */
-	ret = rpmsg_send(rpdev->ept, MSG, strlen(MSG));
-	if (ret) {
-		dev_err(&rpdev->dev, "rpmsg_send failed: %d\n", ret);
-		return ret;
+	if (rpdev->dst != RPMSG_ADDR_ANY) {
+		/* send a message to our remote processor */
+		ret = rpmsg_send(rpdev->ept, MSG, strlen(MSG));
+		if (ret) {
+			dev_err(&rpdev->dev, "rpmsg_send failed: %d\n", ret);
+			return ret;
+		}
 	}
 
 	return 0;
Guennadi Liakhovetski June 18, 2020, 10:39 a.m. UTC | #5
On Thu, Jun 18, 2020 at 11:33:24AM +0200, Vincent Whitchurch wrote:
> On Thu, Jun 18, 2020 at 11:03:42AM +0200, Guennadi Liakhovetski wrote:
> > On Wed, Jun 17, 2020 at 09:17:42PM +0200, Vincent Whitchurch wrote:
> > > On Wed, May 27, 2020 at 08:05:41PM +0200, Guennadi Liakhovetski wrote:
> > > > Linux supports running the RPMsg protocol over the VirtIO transport
> > > > protocol, but currently there is only support for VirtIO clients and
> > > > no support for a VirtIO server. This patch adds a vhost-based RPMsg
> > > > server implementation.
> > > 
> > > This looks really useful, but why is it implemented as an API and not as
> > > a real vhost driver which implements an rpmsg bus?  If you implement it
> > > as a vhost driver which implements rpmsg_device_ops and
> > > rpmsg_endpoint_ops, then wouldn't you be able to implement your
> > > vhost-sof driver using the normal rpmsg APIs?
> > 
> > Sorry, not sure what you mean by the "normal rpmsg API?" Do you mean the 
> > VirtIO RPMsg API? But that's the opposite side of the link - that's the 
> > guest side in the VM case and the Linux side in the remoteproc case. What 
> > this API is adding is a vhost RPMsg API. The kernel vhost framework 
> > itself is essentially a library of functions. Kernel vhost drivers simply 
> > create a misc device and use the vhost functions for some common 
> > functionality. This RPMsg vhost API stays in the same concept and provides 
> > further functions for RPMsg specific vhost operation.
> 
> By the "normal rpmsg API" I mean register_rpmsg_driver(), rpmsg_send(),
> etc.  That API is not tied to virtio in any way and there are other
> non-virtio backends for this API in the tree.  So it seems quite natural
> to implement a vhost backend for this API so that both sides of the link
> can use the same API but different backends, instead of forcing them to
> use of different APIs.

Ok, I see what you mean now. But I'm not sure this is useful or desired. I'm 
not an expert in KVM / VirtIO, I've only been working in the area for less 
than a year, so, I might well be wrong.

You're proposing to use the rpmsg API in vhost drivers. As far as I 
understand so far that API was only designated for the Linux side (in case of 
AMPs) which corresponds to VM guests in virtualisation case. So, I'm not sure 
we want to use the same API for the hosts? This can be done as you have 
illustrated, but is it desirable? The vhost API is far enough from the VirtIO 
driver API, so I'm not sure why we want the same API for rpmsg?

Thanks
Guennadi

> > > I tried quickly hooking up this code to such a vhost driver and I was
> > > able to communicate between host and guest systems with both
> > > rpmsg-client-sample and rpmsg-char which almost no modifications to
> > > those drivers.
> > 
> > You mean you used this patch to create RPMsg vhost drivers? Without 
> > creating a vhost RPMsg bus? Nice, glad to hear that!
> 
> Not quite, I hacked togther a single generic vhost-rpmsg-bus driver
> which just wraps the API in this patch and implements a basic
> rpmsg_device_ops and rpmsg_endpoint_ops.  Then with the following
> patches and no other vhost-specific API use, I was able to load and use
> the same rpmsg-char and rpmsg-client-sample drivers on both host and
> guest kernels.
> 
> Userspace sets up the vhost using vhost-rpmsg-bus' misc device and
> triggers creation of an rpdev which leads to a probe of the (for
> example) rpmsg-client-sample driver on the host (server), which, in
> turn, via NS announcement, triggers a creation of an rpdev and a probe
> of the rpmsg-client-sample driver on the guest (client).
> 
> diff --git a/drivers/rpmsg/rpmsg_char.c b/drivers/rpmsg/rpmsg_char.c
> index a76b963a7e5..7a03978d002 100644
> --- a/drivers/rpmsg/rpmsg_char.c
> +++ b/drivers/rpmsg/rpmsg_char.c
> @@ -104,6 +104,11 @@ static int rpmsg_ept_cb(struct rpmsg_device *rpdev, void *buf, int len,
>  	struct rpmsg_eptdev *eptdev = priv;
>  	struct sk_buff *skb;
>  
> +	if (rpdev->dst == RPMSG_ADDR_ANY) {
> +		printk("%s: got client address %#x from first rx!\n", __func__, addr);
> +		rpdev->dst = addr;
> +	}
> +
>  	skb = alloc_skb(len, GFP_ATOMIC);
>  	if (!skb)
>  		return -ENOMEM;
> @@ -235,6 +240,12 @@ static ssize_t rpmsg_eptdev_write(struct file *filp, const char __user *buf,
>  		goto unlock_eptdev;
>  	}
>  
> +	if (eptdev->rpdev->dst == RPMSG_ADDR_ANY) {
> +		ret = -EPIPE;
> +		WARN(1, "Cannot write first on server, must wait for client!\n");
> +		goto unlock_eptdev;
> +	}
> +
>  	if (filp->f_flags & O_NONBLOCK)
>  		ret = rpmsg_trysend(eptdev->ept, kbuf, len);
>  	else
> diff --git a/samples/rpmsg/rpmsg_client_sample.c b/samples/rpmsg/rpmsg_client_sample.c
> index f161dfd3e70..5d8ca84dce0 100644
> --- a/samples/rpmsg/rpmsg_client_sample.c
> +++ b/samples/rpmsg/rpmsg_client_sample.c
> @@ -46,6 +46,9 @@ static int rpmsg_sample_cb(struct rpmsg_device *rpdev, void *data, int len,
>  		return 0;
>  	}
>  
> +	if (rpdev->dst == RPMSG_ADDR_ANY)
> +		rpdev->dst = src;
> +
>  	/* send a new message now */
>  	ret = rpmsg_send(rpdev->ept, MSG, strlen(MSG));
>  	if (ret)
> @@ -68,11 +71,13 @@ static int rpmsg_sample_probe(struct rpmsg_device *rpdev)
>  
>  	dev_set_drvdata(&rpdev->dev, idata);
>  
> -	/* send a message to our remote processor */
> -	ret = rpmsg_send(rpdev->ept, MSG, strlen(MSG));
> -	if (ret) {
> -		dev_err(&rpdev->dev, "rpmsg_send failed: %d\n", ret);
> -		return ret;
> +	if (rpdev->dst != RPMSG_ADDR_ANY) {
> +		/* send a message to our remote processor */
> +		ret = rpmsg_send(rpdev->ept, MSG, strlen(MSG));
> +		if (ret) {
> +			dev_err(&rpdev->dev, "rpmsg_send failed: %d\n", ret);
> +			return ret;
> +		}
>  	}
>  
>  	return 0;
Vincent Whitchurch June 18, 2020, 1:52 p.m. UTC | #6
On Thu, Jun 18, 2020 at 12:39:40PM +0200, Guennadi Liakhovetski wrote:
> On Thu, Jun 18, 2020 at 11:33:24AM +0200, Vincent Whitchurch wrote:
> > By the "normal rpmsg API" I mean register_rpmsg_driver(), rpmsg_send(),
> > etc.  That API is not tied to virtio in any way and there are other
> > non-virtio backends for this API in the tree.  So it seems quite natural
> > to implement a vhost backend for this API so that both sides of the link
> > can use the same API but different backends, instead of forcing them to
> > use of different APIs.
> 
> Ok, I see what you mean now. But I'm not sure this is useful or desired. I'm 
> not an expert in KVM / VirtIO, I've only been working in the area for less 
> than a year, so, I might well be wrong.
> 
> You're proposing to use the rpmsg API in vhost drivers. As far as I 
> understand so far that API was only designated for the Linux side (in case of 
> AMPs) which corresponds to VM guests in virtualisation case. So, I'm not sure 
> we want to use the same API for the hosts? This can be done as you have 
> illustrated, but is it desirable? The vhost API is far enough from the VirtIO 
> driver API, so I'm not sure why we want the same API for rpmsg?

Note that "the Linux side" is ambiguous for AMP since both sides can be
Linux, as they happen to be in my case.  I'm running virtio/rpmsg
between two physical processors (of different architectures), both
running Linux.

virtio has distinct driver and device roles so the completely different
APIs on each side are understandable.  But I don't see that distinction
in the rpmsg API which is why it seems like a good idea to me to make it
work from both sides of the link and allow the reuse of drivers like
rpmsg-char, instead of imposing virtio's distinction on rpmsg.
Guennadi Liakhovetski June 18, 2020, 2:14 p.m. UTC | #7
On Thu, Jun 18, 2020 at 03:52:42PM +0200, Vincent Whitchurch wrote:
> On Thu, Jun 18, 2020 at 12:39:40PM +0200, Guennadi Liakhovetski wrote:
> > On Thu, Jun 18, 2020 at 11:33:24AM +0200, Vincent Whitchurch wrote:
> > > By the "normal rpmsg API" I mean register_rpmsg_driver(), rpmsg_send(),
> > > etc.  That API is not tied to virtio in any way and there are other
> > > non-virtio backends for this API in the tree.  So it seems quite natural
> > > to implement a vhost backend for this API so that both sides of the link
> > > can use the same API but different backends, instead of forcing them to
> > > use of different APIs.
> > 
> > Ok, I see what you mean now. But I'm not sure this is useful or desired. I'm 
> > not an expert in KVM / VirtIO, I've only been working in the area for less 
> > than a year, so, I might well be wrong.
> > 
> > You're proposing to use the rpmsg API in vhost drivers. As far as I 
> > understand so far that API was only designated for the Linux side (in case of 
> > AMPs) which corresponds to VM guests in virtualisation case. So, I'm not sure 
> > we want to use the same API for the hosts? This can be done as you have 
> > illustrated, but is it desirable? The vhost API is far enough from the VirtIO 
> > driver API, so I'm not sure why we want the same API for rpmsg?
> 
> Note that "the Linux side" is ambiguous for AMP since both sides can be
> Linux, as they happen to be in my case.  I'm running virtio/rpmsg
> between two physical processors (of different architectures), both
> running Linux.

Ok, interesting, I didn't know such configurations were used too. I understood 
the Linux rpmsg implementation in the way, that it's assumed, that the "host" 
has to boot the "device" by sending an ELF formatted executable image to it, is 
that optional? You aren't sending a complete Linux image to the device side, 
are you?

> virtio has distinct driver and device roles so the completely different
> APIs on each side are understandable.  But I don't see that distinction
> in the rpmsg API which is why it seems like a good idea to me to make it
> work from both sides of the link and allow the reuse of drivers like
> rpmsg-char, instead of imposing virtio's distinction on rpmsg.

Understand. In principle I'm open to this idea, but before I implement it it 
would be good to know what maintainers think?

Thanks
Guennadi
Vincent Whitchurch July 14, 2020, 8:33 a.m. UTC | #8
On Thu, Jun 18, 2020 at 04:14:12PM +0200, Guennadi Liakhovetski wrote:
> On Thu, Jun 18, 2020 at 03:52:42PM +0200, Vincent Whitchurch wrote:
> > Note that "the Linux side" is ambiguous for AMP since both sides can be
> > Linux, as they happen to be in my case.  I'm running virtio/rpmsg
> > between two physical processors (of different architectures), both
> > running Linux.
> 
> Ok, interesting, I didn't know such configurations were used too. I understood 
> the Linux rpmsg implementation in the way, that it's assumed, that the "host" 
> has to boot the "device" by sending an ELF formatted executable image to it, is 
> that optional? You aren't sending a complete Linux image to the device side, 
> are you?

I do pack the zImage, the dtb, and the initramfs into an ELF (along with
a tiny "bootloader" with just a handful of instructions), but the
remoteproc framework is not tied to the ELF format since ->parse_fw()
and friends are overridable by the remoteproc driver.

> > virtio has distinct driver and device roles so the completely different
> > APIs on each side are understandable.  But I don't see that distinction
> > in the rpmsg API which is why it seems like a good idea to me to make it
> > work from both sides of the link and allow the reuse of drivers like
> > rpmsg-char, instead of imposing virtio's distinction on rpmsg.
> 
> Understand. In principle I'm open to this idea, but before I implement it it 
> would be good to know what maintainers think?

Certainly.
diff mbox series

Patch

diff --git a/drivers/vhost/Kconfig b/drivers/vhost/Kconfig
index 2c75d16..8b91f3e 100644
--- a/drivers/vhost/Kconfig
+++ b/drivers/vhost/Kconfig
@@ -38,6 +38,13 @@  config VHOST_NET
 	  To compile this driver as a module, choose M here: the module will
 	  be called vhost_net.
 
+config VHOST_RPMSG
+	tristate
+	depends on VHOST
+	help
+	  Vhost RPMsg API allows vhost drivers to communicate with VirtIO
+	  drivers, using the RPMsg over VirtIO protocol.
+
 config VHOST_SCSI
 	tristate "VHOST_SCSI TCM fabric driver"
 	depends on TARGET_CORE && EVENTFD
diff --git a/drivers/vhost/Makefile b/drivers/vhost/Makefile
index f3e1897..9cf459d 100644
--- a/drivers/vhost/Makefile
+++ b/drivers/vhost/Makefile
@@ -2,6 +2,9 @@ 
 obj-$(CONFIG_VHOST_NET) += vhost_net.o
 vhost_net-y := net.o
 
+obj-$(CONFIG_VHOST_RPMSG) += vhost_rpmsg.o
+vhost_rpmsg-y := rpmsg.o
+
 obj-$(CONFIG_VHOST_SCSI) += vhost_scsi.o
 vhost_scsi-y := scsi.o
 
diff --git a/drivers/vhost/rpmsg.c b/drivers/vhost/rpmsg.c
new file mode 100644
index 00000000..ea77e1f
--- /dev/null
+++ b/drivers/vhost/rpmsg.c
@@ -0,0 +1,382 @@ 
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ * Copyright(c) 2020 Intel Corporation. All rights reserved.
+ *
+ * Author: Guennadi Liakhovetski <guennadi.liakhovetski@linux.intel.com>
+ *
+ * Vhost RPMsg VirtIO interface. It provides a set of functions to match the
+ * guest side RPMsg VirtIO API, provided by drivers/rpmsg/virtio_rpmsg_bus.c
+ * These functions handle creation of 2 virtual queues, handling of endpoint
+ * addresses, sending a name-space announcement to the guest as well as any
+ * user messages. This API can be used by any vhost driver to handle RPMsg
+ * specific processing.
+ * Specific vhost drivers, using this API will use their own VirtIO device
+ * IDs, that should then also be added to the ID table in virtio_rpmsg_bus.c
+ */
+
+#include <linux/compat.h>
+#include <linux/file.h>
+#include <linux/miscdevice.h>
+#include <linux/module.h>
+#include <linux/mutex.h>
+#include <linux/vhost.h>
+#include <linux/virtio_rpmsg.h>
+#include <uapi/linux/rpmsg.h>
+
+#include "vhost.h"
+#include "vhost_rpmsg.h"
+
+/*
+ * All virtio-rpmsg virtual queue kicks always come with just one buffer -
+ * either input or output
+ */
+static int vhost_rpmsg_get_single(struct vhost_virtqueue *vq)
+{
+	struct vhost_rpmsg *vr = container_of(vq->dev, struct vhost_rpmsg, dev);
+	unsigned int out, in;
+	int head = vhost_get_vq_desc(vq, vq->iov, ARRAY_SIZE(vq->iov),
+				     &out, &in, NULL, NULL);
+	if (head < 0) {
+		vq_err(vq, "%s(): error %d getting buffer\n",
+		       __func__, head);
+		return head;
+	}
+
+	/* Nothing new? */
+	if (head == vq->num)
+		return head;
+
+	if (vq == &vr->vq[VIRTIO_RPMSG_RESPONSE] && (out || in != 1)) {
+		vq_err(vq,
+		       "%s(): invalid %d input and %d output in response queue\n",
+		       __func__, in, out);
+		goto return_buf;
+	}
+
+	if (vq == &vr->vq[VIRTIO_RPMSG_REQUEST] && (in || out != 1)) {
+		vq_err(vq,
+		       "%s(): invalid %d input and %d output in request queue\n",
+		       __func__, in, out);
+		goto return_buf;
+	}
+
+	return head;
+
+return_buf:
+	/*
+	 * FIXME: might need to return the buffer using vhost_add_used()
+	 * or vhost_discard_vq_desc(). vhost_discard_vq_desc() is
+	 * described as "being useful for error handling," but it makes
+	 * the thus discarded buffers "unseen," so next time we look we
+	 * retrieve them again?
+	 */
+	return -EINVAL;
+}
+
+static const struct vhost_rpmsg_ept *vhost_rpmsg_ept_find(
+					struct vhost_rpmsg *vr, int addr)
+{
+	unsigned int i;
+
+	for (i = 0; i < vr->n_epts; i++)
+		if (vr->ept[i].addr == addr)
+			return vr->ept + i;
+
+	return NULL;
+}
+
+/*
+ * if len < 0, then for reading a request, the complete virtual queue buffer
+ * size is prepared, for sending a response, the length in the iterator is used
+ */
+int vhost_rpmsg_start_lock(struct vhost_rpmsg *vr,
+			   struct vhost_rpmsg_iter *iter,
+			   unsigned int qid, ssize_t len)
+	__acquires(vq->mutex)
+{
+	struct vhost_virtqueue *vq = vr->vq + qid;
+	size_t tmp;
+
+	if (qid >= VIRTIO_RPMSG_NUM_OF_VQS)
+		return -EINVAL;
+
+	iter->vq = vq;
+
+	mutex_lock(&vq->mutex);
+	vhost_disable_notify(&vr->dev, vq);
+
+	iter->head = vhost_rpmsg_get_single(vq);
+	if (iter->head == vq->num)
+		iter->head = -EAGAIN;
+
+	if (iter->head < 0)
+		goto unlock;
+
+	tmp = vq->iov[0].iov_len;
+	if (tmp < sizeof(iter->rhdr)) {
+		vq_err(vq, "%s(): size %zu too small\n", __func__, tmp);
+		iter->head = -ENOBUFS;
+		goto return_buf;
+	}
+
+	switch (qid) {
+	case VIRTIO_RPMSG_REQUEST:
+		if (len < 0) {
+			len = tmp - sizeof(iter->rhdr);
+		} else if (tmp < sizeof(iter->rhdr) + len) {
+			iter->head = -ENOBUFS;
+			goto return_buf;
+		}
+
+		/* len is now the size of the payload */
+		iov_iter_init(&iter->iov_iter, WRITE,
+			      vq->iov, 1, sizeof(iter->rhdr) + len);
+
+		/* Read the RPMSG header with endpoint addresses */
+		tmp = copy_from_iter(&iter->rhdr, sizeof(iter->rhdr),
+				     &iter->iov_iter);
+		if (tmp != sizeof(iter->rhdr)) {
+			vq_err(vq, "%s(): got %zu instead of %zu\n", __func__,
+			       tmp, sizeof(iter->rhdr));
+			iter->head = -EIO;
+			goto return_buf;
+		}
+
+		iter->ept = vhost_rpmsg_ept_find(vr, iter->rhdr.dst);
+		if (!iter->ept) {
+			vq_err(vq, "%s(): no endpoint with address %d\n",
+			       __func__, iter->rhdr.dst);
+			iter->head = -ENOENT;
+			goto return_buf;
+		}
+
+		/* Let the endpoint read the payload */
+		if (iter->ept->read) {
+			ssize_t ret = iter->ept->read(vr, iter);
+
+			if (ret < 0) {
+				iter->head = ret;
+				goto return_buf;
+			}
+
+			iter->rhdr.len = ret;
+		} else {
+			iter->rhdr.len = 0;
+		}
+
+		/* Prepare for the response phase */
+		iter->rhdr.dst = iter->rhdr.src;
+		iter->rhdr.src = iter->ept->addr;
+
+		break;
+	case VIRTIO_RPMSG_RESPONSE:
+		if (!iter->ept && iter->rhdr.dst != RPMSG_NS_ADDR) {
+			/*
+			 * Usually the iterator is configured when processing a
+			 * message on the request queue, but it's also possible
+			 * to send a message on the response queue without a
+			 * preceding request, in that case the iterator must
+			 * contain source and destination addresses.
+			 */
+			iter->ept = vhost_rpmsg_ept_find(vr, iter->rhdr.src);
+			if (!iter->ept) {
+				iter->head = -ENOENT;
+				goto return_buf;
+			}
+		}
+
+		if (len < 0) {
+			len = tmp - sizeof(iter->rhdr);
+		} else if (tmp < sizeof(iter->rhdr) + len) {
+			iter->head = -ENOBUFS;
+			goto return_buf;
+		} else {
+			iter->rhdr.len = len;
+		}
+
+		/* len is now the size of the payload */
+		iov_iter_init(&iter->iov_iter, READ,
+			      vq->iov, 1, sizeof(iter->rhdr) + len);
+
+		/* Write the RPMSG header with endpoint addresses */
+		tmp = copy_to_iter(&iter->rhdr, sizeof(iter->rhdr),
+				   &iter->iov_iter);
+		if (tmp != sizeof(iter->rhdr)) {
+			iter->head = -EIO;
+			goto return_buf;
+		}
+
+		/* Let the endpoint write the payload */
+		if (iter->ept && iter->ept->write) {
+			ssize_t ret = iter->ept->write(vr, iter);
+
+			if (ret < 0) {
+				iter->head = ret;
+				goto return_buf;
+			}
+		}
+
+		break;
+	}
+
+	return 0;
+
+return_buf:
+	/*
+	 * FIXME: vhost_discard_vq_desc() or vhost_add_used(), see comment in
+	 * vhost_rpmsg_get_single()
+	 */
+unlock:
+	vhost_enable_notify(&vr->dev, vq);
+	mutex_unlock(&vq->mutex);
+
+	return iter->head;
+}
+EXPORT_SYMBOL_GPL(vhost_rpmsg_start_lock);
+
+size_t vhost_rpmsg_copy(struct vhost_rpmsg *vr, struct vhost_rpmsg_iter *iter,
+			void *data, size_t size)
+{
+	/*
+	 * We could check for excess data, but copy_{to,from}_iter() don't do
+	 * that either
+	 */
+	if (iter->vq == vr->vq + VIRTIO_RPMSG_RESPONSE)
+		return copy_to_iter(data, size, &iter->iov_iter);
+
+	return copy_from_iter(data, size, &iter->iov_iter);
+}
+EXPORT_SYMBOL_GPL(vhost_rpmsg_copy);
+
+int vhost_rpmsg_finish_unlock(struct vhost_rpmsg *vr,
+			      struct vhost_rpmsg_iter *iter)
+	__releases(vq->mutex)
+{
+	if (iter->head >= 0)
+		vhost_add_used_and_signal(iter->vq->dev, iter->vq, iter->head,
+					  iter->rhdr.len + sizeof(iter->rhdr));
+
+	vhost_enable_notify(&vr->dev, iter->vq);
+	mutex_unlock(&iter->vq->mutex);
+
+	return iter->head;
+}
+EXPORT_SYMBOL_GPL(vhost_rpmsg_finish_unlock);
+
+/*
+ * Return false to terminate the external loop only if we fail to obtain either
+ * a request or a response buffer
+ */
+static bool handle_rpmsg_req_single(struct vhost_rpmsg *vr,
+				    struct vhost_virtqueue *vq)
+{
+	struct vhost_rpmsg_iter iter;
+	int ret = vhost_rpmsg_start_lock(vr, &iter, VIRTIO_RPMSG_REQUEST,
+					 -EINVAL);
+	if (!ret)
+		ret = vhost_rpmsg_finish_unlock(vr, &iter);
+	if (ret < 0) {
+		if (ret != -EAGAIN)
+			vq_err(vq, "%s(): RPMSG processing failed %d\n",
+			       __func__, ret);
+		return false;
+	}
+
+	if (!iter.ept->write)
+		return true;
+
+	ret = vhost_rpmsg_start_lock(vr, &iter, VIRTIO_RPMSG_RESPONSE,
+				     -EINVAL);
+	if (!ret)
+		ret = vhost_rpmsg_finish_unlock(vr, &iter);
+	if (ret < 0) {
+		vq_err(vq, "%s(): RPMSG finalising failed %d\n", __func__, ret);
+		return false;
+	}
+
+	return true;
+}
+
+static void handle_rpmsg_req_kick(struct vhost_work *work)
+{
+	struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue,
+						  poll.work);
+	struct vhost_rpmsg *vr = container_of(vq->dev, struct vhost_rpmsg, dev);
+
+	while (handle_rpmsg_req_single(vr, vq))
+		;
+}
+
+/*
+ * initialise two virtqueues with an array of endpoints,
+ * request and response callbacks
+ */
+void vhost_rpmsg_init(struct vhost_rpmsg *vr, const struct vhost_rpmsg_ept *ept,
+		      unsigned int n_epts)
+{
+	unsigned int i;
+
+	for (i = 0; i < ARRAY_SIZE(vr->vq); i++)
+		vr->vq_p[i] = &vr->vq[i];
+
+	/* vq[0]: host -> guest, vq[1]: host <- guest */
+	vr->vq[VIRTIO_RPMSG_REQUEST].handle_kick = handle_rpmsg_req_kick;
+	vr->vq[VIRTIO_RPMSG_RESPONSE].handle_kick = NULL;
+
+	vr->ept = ept;
+	vr->n_epts = n_epts;
+
+	vhost_dev_init(&vr->dev, vr->vq_p, VIRTIO_RPMSG_NUM_OF_VQS,
+		       UIO_MAXIOV, 0, 0, NULL);
+}
+EXPORT_SYMBOL_GPL(vhost_rpmsg_init);
+
+void vhost_rpmsg_destroy(struct vhost_rpmsg *vr)
+{
+	if (vhost_dev_has_owner(&vr->dev))
+		vhost_poll_flush(&vr->vq[VIRTIO_RPMSG_REQUEST].poll);
+
+	vhost_dev_cleanup(&vr->dev);
+}
+EXPORT_SYMBOL_GPL(vhost_rpmsg_destroy);
+
+/* send namespace */
+int vhost_rpmsg_ns_announce(struct vhost_rpmsg *vr, const char *name,
+			    unsigned int src)
+{
+	struct vhost_rpmsg_iter iter = {
+		.rhdr = {
+			.src = 0,
+			.dst = RPMSG_NS_ADDR,
+			.flags = RPMSG_NS_CREATE, /* rpmsg_recv_single() */
+		},
+	};
+	struct rpmsg_ns_msg ns = {
+		.addr = src,
+		.flags = RPMSG_NS_CREATE, /* for rpmsg_ns_cb() */
+	};
+	int ret = vhost_rpmsg_start_lock(vr, &iter, VIRTIO_RPMSG_RESPONSE,
+					 sizeof(ns));
+
+	if (ret < 0)
+		return ret;
+
+	strlcpy(ns.name, name, sizeof(ns.name));
+
+	ret = vhost_rpmsg_copy(vr, &iter, &ns, sizeof(ns));
+	if (ret != sizeof(ns))
+		vq_err(iter.vq, "%s(): added %d instead of %zu bytes\n",
+		       __func__, ret, sizeof(ns));
+
+	ret = vhost_rpmsg_finish_unlock(vr, &iter);
+	if (ret < 0)
+		vq_err(iter.vq, "%s(): namespace announcement failed: %d\n",
+		       __func__, ret);
+
+	return ret;
+}
+EXPORT_SYMBOL_GPL(vhost_rpmsg_ns_announce);
+
+MODULE_LICENSE("GPL v2");
+MODULE_AUTHOR("Intel, Inc.");
+MODULE_DESCRIPTION("Vhost RPMsg API");
diff --git a/drivers/vhost/vhost_rpmsg.h b/drivers/vhost/vhost_rpmsg.h
new file mode 100644
index 00000000..a3d0dda
--- /dev/null
+++ b/drivers/vhost/vhost_rpmsg.h
@@ -0,0 +1,74 @@ 
+/* SPDX-License-Identifier: GPL-2.0 */
+/*
+ * Copyright(c) 2020 Intel Corporation. All rights reserved.
+ *
+ * Author: Guennadi Liakhovetski <guennadi.liakhovetski@linux.intel.com>
+ */
+
+#ifndef VHOST_RPMSG_H
+#define VHOST_RPMSG_H
+
+#include <linux/uio.h>
+#include <linux/virtio_rpmsg.h>
+
+#include "vhost.h"
+
+/* RPMsg uses two VirtQueues: one for each direction */
+enum {
+	VIRTIO_RPMSG_RESPONSE,	/* RPMsg response (host->guest) buffers */
+	VIRTIO_RPMSG_REQUEST,	/* RPMsg request (guest->host) buffers */
+	/* Keep last */
+	VIRTIO_RPMSG_NUM_OF_VQS,
+};
+
+struct vhost_rpmsg_ept;
+
+struct vhost_rpmsg_iter {
+	struct iov_iter iov_iter;
+	struct rpmsg_hdr rhdr;
+	struct vhost_virtqueue *vq;
+	const struct vhost_rpmsg_ept *ept;
+	int head;
+	void *priv;
+};
+
+struct vhost_rpmsg {
+	struct vhost_dev dev;
+	struct vhost_virtqueue vq[VIRTIO_RPMSG_NUM_OF_VQS];
+	struct vhost_virtqueue *vq_p[VIRTIO_RPMSG_NUM_OF_VQS];
+	const struct vhost_rpmsg_ept *ept;
+	unsigned int n_epts;
+};
+
+struct vhost_rpmsg_ept {
+	ssize_t (*read)(struct vhost_rpmsg *, struct vhost_rpmsg_iter *);
+	ssize_t (*write)(struct vhost_rpmsg *, struct vhost_rpmsg_iter *);
+	int addr;
+};
+
+static inline size_t vhost_rpmsg_iter_len(const struct vhost_rpmsg_iter *iter)
+{
+	return iter->rhdr.len;
+}
+
+#define VHOST_RPMSG_ITER(_src, _dst) {	\
+	.rhdr = {			\
+			.src = _src,	\
+			.dst = _dst,	\
+		},			\
+	}
+
+void vhost_rpmsg_init(struct vhost_rpmsg *vr, const struct vhost_rpmsg_ept *ept,
+		      unsigned int n_epts);
+void vhost_rpmsg_destroy(struct vhost_rpmsg *vr);
+int vhost_rpmsg_ns_announce(struct vhost_rpmsg *vr, const char *name,
+			    unsigned int src);
+int vhost_rpmsg_start_lock(struct vhost_rpmsg *vr,
+			   struct vhost_rpmsg_iter *iter,
+			   unsigned int qid, ssize_t len);
+size_t vhost_rpmsg_copy(struct vhost_rpmsg *vr, struct vhost_rpmsg_iter *iter,
+			void *data, size_t size);
+int vhost_rpmsg_finish_unlock(struct vhost_rpmsg *vr,
+			      struct vhost_rpmsg_iter *iter);
+
+#endif