diff mbox series

[RFC,v4,08/21] vfio-user: define socket receive functions

Message ID a89cfd3195740dfb313d1947c0c7de583e4d0f46.1641584317.git.john.g.johnson@oracle.com (mailing list archive)
State New, archived
Headers show
Series vfio-user client | expand

Commit Message

John Johnson Jan. 12, 2022, 12:43 a.m. UTC
Add infrastructure needed to receive incoming messages

Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
---
 hw/vfio/user-protocol.h |  54 ++++++++
 hw/vfio/user.h          |   6 +
 hw/vfio/pci.c           |   6 +
 hw/vfio/user.c          | 327 ++++++++++++++++++++++++++++++++++++++++++++++++
 MAINTAINERS             |   1 +
 5 files changed, 394 insertions(+)
 create mode 100644 hw/vfio/user-protocol.h

Comments

Thanos Makatos Feb. 3, 2022, 9:53 p.m. UTC | #1
> -----Original Message-----
> From: Qemu-devel <qemu-devel-
> bounces+thanos.makatos=nutanix.com@nongnu.org> On Behalf Of John
> Johnson
> Sent: 12 January 2022 00:44
> To: qemu-devel@nongnu.org
> Subject: [RFC v4 08/21] vfio-user: define socket receive functions
> 
> Add infrastructure needed to receive incoming messages
> 
> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> ---
>  hw/vfio/user-protocol.h |  54 ++++++++
>  hw/vfio/user.h          |   6 +
>  hw/vfio/pci.c           |   6 +
>  hw/vfio/user.c          | 327
> ++++++++++++++++++++++++++++++++++++++++++++++++
>  MAINTAINERS             |   1 +
>  5 files changed, 394 insertions(+)
>  create mode 100644 hw/vfio/user-protocol.h
> 
> diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h
> new file mode 100644
> index 0000000..d23877c
> --- /dev/null
> +++ b/hw/vfio/user-protocol.h
> @@ -0,0 +1,54 @@
> +#ifndef VFIO_USER_PROTOCOL_H
> +#define VFIO_USER_PROTOCOL_H
> +
> +/*
> + * vfio protocol over a UNIX socket.
> + *
> + * Copyright © 2018, 2021 Oracle and/or its affiliates.
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + * Each message has a standard header that describes the command
> + * being sent, which is almost always a VFIO ioctl().
> + *
> + * The header may be followed by command-specific data, such as the
> + * region and offset info for read and write commands.
> + */
> +
> +typedef struct {
> +    uint16_t id;
> +    uint16_t command;
> +    uint32_t size;
> +    uint32_t flags;
> +    uint32_t error_reply;
> +} VFIOUserHdr;
> +
> +/* VFIOUserHdr commands */
> +enum vfio_user_command {
> +    VFIO_USER_VERSION                   = 1,
> +    VFIO_USER_DMA_MAP                   = 2,
> +    VFIO_USER_DMA_UNMAP                 = 3,
> +    VFIO_USER_DEVICE_GET_INFO           = 4,
> +    VFIO_USER_DEVICE_GET_REGION_INFO    = 5,
> +    VFIO_USER_DEVICE_GET_REGION_IO_FDS  = 6,
> +    VFIO_USER_DEVICE_GET_IRQ_INFO       = 7,
> +    VFIO_USER_DEVICE_SET_IRQS           = 8,
> +    VFIO_USER_REGION_READ               = 9,
> +    VFIO_USER_REGION_WRITE              = 10,
> +    VFIO_USER_DMA_READ                  = 11,
> +    VFIO_USER_DMA_WRITE                 = 12,
> +    VFIO_USER_DEVICE_RESET              = 13,
> +    VFIO_USER_DIRTY_PAGES               = 14,
> +    VFIO_USER_MAX,
> +};
> +
> +/* VFIOUserHdr flags */
> +#define VFIO_USER_REQUEST       0x0
> +#define VFIO_USER_REPLY         0x1
> +#define VFIO_USER_TYPE          0xF
> +
> +#define VFIO_USER_NO_REPLY      0x10
> +#define VFIO_USER_ERROR         0x20
> +
> +#endif /* VFIO_USER_PROTOCOL_H */
> diff --git a/hw/vfio/user.h b/hw/vfio/user.h
> index da92862..72eefa7 100644
> --- a/hw/vfio/user.h
> +++ b/hw/vfio/user.h
> @@ -11,6 +11,8 @@
>   *
>   */
> 
> +#include "user-protocol.h"
> +
>  typedef struct {
>      int send_fds;
>      int recv_fds;
> @@ -27,6 +29,7 @@ enum msg_type {
> 
>  typedef struct VFIOUserMsg {
>      QTAILQ_ENTRY(VFIOUserMsg) next;
> +    VFIOUserHdr *hdr;
>      VFIOUserFDs *fds;
>      uint32_t rsize;
>      uint32_t id;
> @@ -74,5 +77,8 @@ typedef struct VFIOProxy {
> 
>  VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
>  void vfio_user_disconnect(VFIOProxy *proxy);
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> +                           void *reqarg);
> 
>  #endif /* VFIO_USER_H */
> diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
> index 9fd7c07..0de915d 100644
> --- a/hw/vfio/pci.c
> +++ b/hw/vfio/pci.c
> @@ -3386,6 +3386,11 @@ type_init(register_vfio_pci_dev_type)
>   * vfio-user routines.
>   */
> 
> +static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
> +{
> +
> +}
> +
>  /*
>   * Emulated devices don't use host hot reset
>   */
> @@ -3432,6 +3437,7 @@ static void vfio_user_pci_realize(PCIDevice *pdev,
> Error **errp)
>          return;
>      }
>      vbasedev->proxy = proxy;
> +    vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
> 
>      vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name);
>      vbasedev->dev = DEVICE(vdev);
> diff --git a/hw/vfio/user.c b/hw/vfio/user.c
> index c843f90..e1dfd5d 100644
> --- a/hw/vfio/user.c
> +++ b/hw/vfio/user.c
> @@ -25,10 +25,26 @@
>  #include "sysemu/iothread.h"
>  #include "user.h"
> 
> +static uint64_t max_xfer_size;
>  static IOThread *vfio_user_iothread;
> 
>  static void vfio_user_shutdown(VFIOProxy *proxy);
> +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
> +                                     VFIOUserFDs *fds);
> +static VFIOUserFDs *vfio_user_getfds(int numfds);
> +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg);
> 
> +static void vfio_user_recv(void *opaque);
> +static int vfio_user_recv_one(VFIOProxy *proxy);
> +static void vfio_user_cb(void *opaque);
> +
> +static void vfio_user_request(void *opaque);
> +
> +static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
> +{
> +    hdr->flags |= VFIO_USER_ERROR;
> +    hdr->error_reply = err;
> +}
> 
>  /*
>   * Functions called by main, CPU, or iothread threads
> @@ -40,10 +56,261 @@ static void vfio_user_shutdown(VFIOProxy *proxy)
>      qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL, NULL, NULL);
>  }
> 
> +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
> +                                     VFIOUserFDs *fds)
> +{
> +    VFIOUserMsg *msg;
> +
> +    msg = QTAILQ_FIRST(&proxy->free);
> +    if (msg != NULL) {
> +        QTAILQ_REMOVE(&proxy->free, msg, next);
> +    } else {
> +        msg = g_malloc0(sizeof(*msg));
> +        qemu_cond_init(&msg->cv);
> +    }
> +
> +    msg->hdr = hdr;
> +    msg->fds = fds;
> +    return msg;
> +}
> +
> +/*
> + * Recycle a message list entry to the free list.
> + */
> +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg)
> +{
> +    if (msg->type == VFIO_MSG_NONE) {
> +        error_printf("vfio_user_recycle - freeing free msg\n");
> +        return;
> +    }
> +
> +    /* free msg buffer if no one is waiting to consume the reply */
> +    if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
> +        g_free(msg->hdr);
> +        if (msg->fds != NULL) {
> +            g_free(msg->fds);
> +        }
> +    }
> +
> +    msg->type = VFIO_MSG_NONE;
> +    msg->hdr = NULL;
> +    msg->fds = NULL;
> +    msg->complete = false;
> +    QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
> +}
> +
> +static VFIOUserFDs *vfio_user_getfds(int numfds)
> +{
> +    VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
> +
> +    fds->fds = (int *)((char *)fds + sizeof(*fds));
> +
> +    return fds;
> +}
> +
>  /*
>   * Functions only called by iothread
>   */
> 
> +static void vfio_user_recv(void *opaque)
> +{
> +    VFIOProxy *proxy = opaque;
> +
> +    QEMU_LOCK_GUARD(&proxy->lock);
> +
> +    if (proxy->state == VFIO_PROXY_CONNECTED) {
> +        while (vfio_user_recv_one(proxy) == 0) {
> +            ;
> +        }
> +    }
> +}
> +
> +/*
> + * Receive and process one incoming message.
> + *
> + * For replies, find matching outgoing request and wake any waiters.
> + * For requests, queue in incoming list and run request BH.
> + */
> +static int vfio_user_recv_one(VFIOProxy *proxy)
> +{
> +    VFIOUserMsg *msg = NULL;
> +    g_autofree int *fdp = NULL;
> +    VFIOUserFDs *reqfds;
> +    VFIOUserHdr hdr;
> +    struct iovec iov = {
> +        .iov_base = &hdr,
> +        .iov_len = sizeof(hdr),
> +    };
> +    bool isreply = false;
> +    int i, ret;
> +    size_t msgleft, numfds = 0;
> +    char *data = NULL;
> +    char *buf = NULL;
> +    Error *local_err = NULL;
> +
> +    /*
> +     * Read header
> +     */
> +    ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds,
> +                                 &local_err);
> +    if (ret == QIO_CHANNEL_ERR_BLOCK) {
> +        return ret;
> +    }
> +    if (ret <= 0) {
> +        /* read error or other side closed connection */
> +        if (ret == 0) {
> +            error_setg(&local_err, "vfio_user_recv server closed socket");
> +        } else {
> +            error_prepend(&local_err, "vfio_user_recv");
> +        }
> +        goto fatal;
> +    }
> +    if (ret < sizeof(msg)) {
> +        error_setg(&local_err, "vfio_user_recv short read of header");
> +        goto fatal;
> +    }
> +
> +    /*
> +     * Validate header
> +     */
> +    if (hdr.size < sizeof(VFIOUserHdr)) {
> +        error_setg(&local_err, "vfio_user_recv bad header size");
> +        goto fatal;
> +    }
> +    switch (hdr.flags & VFIO_USER_TYPE) {
> +    case VFIO_USER_REQUEST:
> +        isreply = false;
> +        break;
> +    case VFIO_USER_REPLY:
> +        isreply = true;
> +        break;
> +    default:
> +        error_setg(&local_err, "vfio_user_recv unknown message type");
> +        goto fatal;
> +    }
> +
> +    /*
> +     * For replies, find the matching pending request.
> +     * For requests, reap incoming FDs.
> +     */
> +    if (isreply) {
> +        QTAILQ_FOREACH(msg, &proxy->pending, next) {
> +            if (hdr.id == msg->id) {
> +                break;
> +            }
> +        }
> +        if (msg == NULL) {
> +            error_setg(&local_err, "vfio_user_recv unexpected reply");
> +            goto err;
> +        }
> +        QTAILQ_REMOVE(&proxy->pending, msg, next);
> +
> +        /*
> +         * Process any received FDs
> +         */
> +        if (numfds != 0) {
> +            if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
> +                error_setg(&local_err, "vfio_user_recv unexpected FDs");
> +                goto err;
> +            }
> +            msg->fds->recv_fds = numfds;
> +            memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
> +        }
> +    } else {
> +        if (numfds != 0) {
> +            reqfds = vfio_user_getfds(numfds);
> +            memcpy(reqfds->fds, fdp, numfds * sizeof(int));
> +        } else {
> +            reqfds = NULL;
> +        }
> +    }
> +
> +    /*
> +     * Put the whole message into a single buffer.
> +     */
> +    if (isreply) {
> +        if (hdr.size > msg->rsize) {
> +            error_setg(&local_err,
> +                       "vfio_user_recv reply larger than recv buffer");
> +            goto err;
> +        }
> +        *msg->hdr = hdr;
> +        data = (char *)msg->hdr + sizeof(hdr);
> +    } else {
> +        if (hdr.size > max_xfer_size) {
> +            error_setg(&local_err, "vfio_user_recv request larger than max");
> +            goto err;
> +        }
> +        buf = g_malloc0(hdr.size);
> +        memcpy(buf, &hdr, sizeof(hdr));
> +        data = buf + sizeof(hdr);
> +        msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
> +        msg->type = VFIO_MSG_REQ;
> +    }
> +
> +    msgleft = hdr.size - sizeof(hdr);
> +    while (msgleft > 0) {
> +        ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
> +
> +        /* error or would block */
> +        if (ret < 0) {
> +            goto fatal;
> +        }

IIUC qio_channel_read() ends up calling qio_channel_socket_readv() which can return QIO_CHANNEL_ERR_BLOCK (-2). The if will be taken so local_err is NULL and that causes a segfault when error_report_err(local_err) is called before returning from this function.

> +
> +        msgleft -= ret;
> +        data += ret;
> +    }
> +
> +    /*
> +     * Replies signal a waiter, if none just check for errors
> +     * and free the message buffer.
> +     *
> +     * Requests get queued for the BH.
> +     */
> +    if (isreply) {
> +        msg->complete = true;
> +        if (msg->type == VFIO_MSG_WAIT) {
> +            qemu_cond_signal(&msg->cv);
> +        } else {
> +            if (hdr.flags & VFIO_USER_ERROR) {
> +                error_printf("vfio_user_rcv error reply on async request ");
> +                error_printf("command %x error %s\n", hdr.command,
> +                             strerror(hdr.error_reply));
> +            }
> +            /* youngest nowait msg has been ack'd */
> +            if (proxy->last_nowait == msg) {
> +                proxy->last_nowait = NULL;
> +            }
> +            vfio_user_recycle(proxy, msg);
> +        }
> +    } else {
> +        QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
> +        qemu_bh_schedule(proxy->req_bh);
> +    }
> +    return 0;
> +
> +    /*
> +     * fatal means the other side closed or we don't trust the stream
> +     * err means this message is corrupt
> +     */
> +fatal:
> +    vfio_user_shutdown(proxy);
> +    proxy->state = VFIO_PROXY_ERROR;
> +
> +err:
> +    for (i = 0; i < numfds; i++) {
> +        close(fdp[i]);
> +    }
> +    if (isreply && msg != NULL) {
> +        /* force an error to keep sending thread from hanging */
> +        vfio_user_set_error(msg->hdr, EINVAL);
> +        msg->complete = true;
> +        qemu_cond_signal(&msg->cv);
> +    }
> +    error_report_err(local_err);
> +    return -1;
> +}
> +
>  static void vfio_user_cb(void *opaque)
>  {
>      VFIOProxy *proxy = opaque;
> @@ -59,6 +326,51 @@ static void vfio_user_cb(void *opaque)
>   * Functions called by main or CPU threads
>   */
> 
> +/*
> + * Process incoming requests.
> + *
> + * The bus-specific callback has the form:
> + *    request(opaque, msg)
> + * where 'opaque' was specified in vfio_user_set_handler
> + * and 'msg' is the inbound message.
> + *
> + * The callback is responsible for disposing of the message buffer,
> + * usually by re-using it when calling vfio_send_reply or vfio_send_error,
> + * both of which free their message buffer when the reply is sent.
> + *
> + * If the callback uses a new buffer, it needs to free the old one.
> + */
> +static void vfio_user_request(void *opaque)
> +{
> +    VFIOProxy *proxy = opaque;
> +    VFIOUserMsgQ new, free;
> +    VFIOUserMsg *msg, *m1;
> +
> +    /* reap all incoming */
> +    QTAILQ_INIT(&new);
> +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> +        QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
> +            QTAILQ_REMOVE(&proxy->pending, msg, next);
> +            QTAILQ_INSERT_TAIL(&new, msg, next);
> +        }
> +    }
> +
> +    /* process list */
> +    QTAILQ_INIT(&free);
> +    QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
> +        QTAILQ_REMOVE(&new, msg, next);
> +        proxy->request(proxy->req_arg, msg);
> +        QTAILQ_INSERT_HEAD(&free, msg, next);
> +    }
> +
> +    /* free list */
> +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> +        QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
> +            vfio_user_recycle(proxy, msg);
> +        }
> +    }
> +}
> +
>  static QLIST_HEAD(, VFIOProxy) vfio_user_sockets =
>      QLIST_HEAD_INITIALIZER(vfio_user_sockets);
> 
> @@ -97,6 +409,7 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress
> *addr, Error **errp)
>      }
> 
>      proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
> +    proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
> 
>      QTAILQ_INIT(&proxy->outgoing);
>      QTAILQ_INIT(&proxy->incoming);
> @@ -107,6 +420,18 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress
> *addr, Error **errp)
>      return proxy;
>  }
> 
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> +                           void *req_arg)
> +{
> +    VFIOProxy *proxy = vbasedev->proxy;
> +
> +    proxy->request = handler;
> +    proxy->req_arg = req_arg;
> +    qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
> +                                   vfio_user_recv, NULL, proxy);
> +}
> +
>  void vfio_user_disconnect(VFIOProxy *proxy)
>  {
>      VFIOUserMsg *r1, *r2;
> @@ -122,6 +447,8 @@ void vfio_user_disconnect(VFIOProxy *proxy)
>      }
>      object_unref(OBJECT(proxy->ioc));
>      proxy->ioc = NULL;
> +    qemu_bh_delete(proxy->req_bh);
> +    proxy->req_bh = NULL;
> 
>      proxy->state = VFIO_PROXY_CLOSING;
>      QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
> diff --git a/MAINTAINERS b/MAINTAINERS
> index cfaccbf..bc0ba88 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -1909,6 +1909,7 @@ S: Supported
>  F: docs/devel/vfio-user.rst
>  F: hw/vfio/user.c
>  F: hw/vfio/user.h
> +F: hw/vfio/user-protocol.h
> 
>  vhost
>  M: Michael S. Tsirkin <mst@redhat.com>
> --
> 1.8.3.1
>
Thanos Makatos Feb. 4, 2022, 12:42 p.m. UTC | #2
> -----Original Message-----
> From: Qemu-devel <qemu-devel-
> bounces+thanos.makatos=nutanix.com@nongnu.org> On Behalf Of Thanos
> Makatos
> Sent: 03 February 2022 21:54
> To: John Johnson <john.g.johnson@oracle.com>; qemu-devel@nongnu.org
> Subject: RE: [RFC v4 08/21] vfio-user: define socket receive functions
> 
> 
> 
> > -----Original Message-----
> > From: Qemu-devel <qemu-devel-
> > bounces+thanos.makatos=nutanix.com@nongnu.org> On Behalf Of John
> > Johnson
> > Sent: 12 January 2022 00:44
> > To: qemu-devel@nongnu.org
> > Subject: [RFC v4 08/21] vfio-user: define socket receive functions
> >
> > Add infrastructure needed to receive incoming messages
> >
> > Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> > Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> > Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> > ---
> >  hw/vfio/user-protocol.h |  54 ++++++++
> >  hw/vfio/user.h          |   6 +
> >  hw/vfio/pci.c           |   6 +
> >  hw/vfio/user.c          | 327
> > ++++++++++++++++++++++++++++++++++++++++++++++++
> >  MAINTAINERS             |   1 +
> >  5 files changed, 394 insertions(+)
> >  create mode 100644 hw/vfio/user-protocol.h
> >
> > diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h
> > new file mode 100644
> > index 0000000..d23877c
> > --- /dev/null
> > +++ b/hw/vfio/user-protocol.h
> > @@ -0,0 +1,54 @@
> > +#ifndef VFIO_USER_PROTOCOL_H
> > +#define VFIO_USER_PROTOCOL_H
> > +
> > +/*
> > + * vfio protocol over a UNIX socket.
> > + *
> > + * Copyright © 2018, 2021 Oracle and/or its affiliates.
> > + *
> > + * This work is licensed under the terms of the GNU GPL, version 2.  See
> > + * the COPYING file in the top-level directory.
> > + *
> > + * Each message has a standard header that describes the command
> > + * being sent, which is almost always a VFIO ioctl().
> > + *
> > + * The header may be followed by command-specific data, such as the
> > + * region and offset info for read and write commands.
> > + */
> > +
> > +typedef struct {
> > +    uint16_t id;
> > +    uint16_t command;
> > +    uint32_t size;
> > +    uint32_t flags;
> > +    uint32_t error_reply;
> > +} VFIOUserHdr;
> > +
> > +/* VFIOUserHdr commands */
> > +enum vfio_user_command {
> > +    VFIO_USER_VERSION                   = 1,
> > +    VFIO_USER_DMA_MAP                   = 2,
> > +    VFIO_USER_DMA_UNMAP                 = 3,
> > +    VFIO_USER_DEVICE_GET_INFO           = 4,
> > +    VFIO_USER_DEVICE_GET_REGION_INFO    = 5,
> > +    VFIO_USER_DEVICE_GET_REGION_IO_FDS  = 6,
> > +    VFIO_USER_DEVICE_GET_IRQ_INFO       = 7,
> > +    VFIO_USER_DEVICE_SET_IRQS           = 8,
> > +    VFIO_USER_REGION_READ               = 9,
> > +    VFIO_USER_REGION_WRITE              = 10,
> > +    VFIO_USER_DMA_READ                  = 11,
> > +    VFIO_USER_DMA_WRITE                 = 12,
> > +    VFIO_USER_DEVICE_RESET              = 13,
> > +    VFIO_USER_DIRTY_PAGES               = 14,
> > +    VFIO_USER_MAX,
> > +};
> > +
> > +/* VFIOUserHdr flags */
> > +#define VFIO_USER_REQUEST       0x0
> > +#define VFIO_USER_REPLY         0x1
> > +#define VFIO_USER_TYPE          0xF
> > +
> > +#define VFIO_USER_NO_REPLY      0x10
> > +#define VFIO_USER_ERROR         0x20
> > +
> > +#endif /* VFIO_USER_PROTOCOL_H */
> > diff --git a/hw/vfio/user.h b/hw/vfio/user.h
> > index da92862..72eefa7 100644
> > --- a/hw/vfio/user.h
> > +++ b/hw/vfio/user.h
> > @@ -11,6 +11,8 @@
> >   *
> >   */
> >
> > +#include "user-protocol.h"
> > +
> >  typedef struct {
> >      int send_fds;
> >      int recv_fds;
> > @@ -27,6 +29,7 @@ enum msg_type {
> >
> >  typedef struct VFIOUserMsg {
> >      QTAILQ_ENTRY(VFIOUserMsg) next;
> > +    VFIOUserHdr *hdr;
> >      VFIOUserFDs *fds;
> >      uint32_t rsize;
> >      uint32_t id;
> > @@ -74,5 +77,8 @@ typedef struct VFIOProxy {
> >
> >  VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
> >  void vfio_user_disconnect(VFIOProxy *proxy);
> > +void vfio_user_set_handler(VFIODevice *vbasedev,
> > +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> > +                           void *reqarg);
> >
> >  #endif /* VFIO_USER_H */
> > diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
> > index 9fd7c07..0de915d 100644
> > --- a/hw/vfio/pci.c
> > +++ b/hw/vfio/pci.c
> > @@ -3386,6 +3386,11 @@ type_init(register_vfio_pci_dev_type)
> >   * vfio-user routines.
> >   */
> >
> > +static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
> > +{
> > +
> > +}
> > +
> >  /*
> >   * Emulated devices don't use host hot reset
> >   */
> > @@ -3432,6 +3437,7 @@ static void vfio_user_pci_realize(PCIDevice *pdev,
> > Error **errp)
> >          return;
> >      }
> >      vbasedev->proxy = proxy;
> > +    vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
> >
> >      vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name);
> >      vbasedev->dev = DEVICE(vdev);
> > diff --git a/hw/vfio/user.c b/hw/vfio/user.c
> > index c843f90..e1dfd5d 100644
> > --- a/hw/vfio/user.c
> > +++ b/hw/vfio/user.c
> > @@ -25,10 +25,26 @@
> >  #include "sysemu/iothread.h"
> >  #include "user.h"
> >
> > +static uint64_t max_xfer_size;
> >  static IOThread *vfio_user_iothread;
> >
> >  static void vfio_user_shutdown(VFIOProxy *proxy);
> > +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr
> *hdr,
> > +                                     VFIOUserFDs *fds);
> > +static VFIOUserFDs *vfio_user_getfds(int numfds);
> > +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg);
> >
> > +static void vfio_user_recv(void *opaque);
> > +static int vfio_user_recv_one(VFIOProxy *proxy);
> > +static void vfio_user_cb(void *opaque);
> > +
> > +static void vfio_user_request(void *opaque);
> > +
> > +static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
> > +{
> > +    hdr->flags |= VFIO_USER_ERROR;
> > +    hdr->error_reply = err;
> > +}
> >
> >  /*
> >   * Functions called by main, CPU, or iothread threads
> > @@ -40,10 +56,261 @@ static void vfio_user_shutdown(VFIOProxy *proxy)
> >      qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL, NULL,
> NULL);
> >  }
> >
> > +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr
> *hdr,
> > +                                     VFIOUserFDs *fds)
> > +{
> > +    VFIOUserMsg *msg;
> > +
> > +    msg = QTAILQ_FIRST(&proxy->free);
> > +    if (msg != NULL) {
> > +        QTAILQ_REMOVE(&proxy->free, msg, next);
> > +    } else {
> > +        msg = g_malloc0(sizeof(*msg));
> > +        qemu_cond_init(&msg->cv);
> > +    }
> > +
> > +    msg->hdr = hdr;
> > +    msg->fds = fds;
> > +    return msg;
> > +}
> > +
> > +/*
> > + * Recycle a message list entry to the free list.
> > + */
> > +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg)
> > +{
> > +    if (msg->type == VFIO_MSG_NONE) {
> > +        error_printf("vfio_user_recycle - freeing free msg\n");
> > +        return;
> > +    }
> > +
> > +    /* free msg buffer if no one is waiting to consume the reply */
> > +    if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC)
> {
> > +        g_free(msg->hdr);
> > +        if (msg->fds != NULL) {
> > +            g_free(msg->fds);
> > +        }
> > +    }
> > +
> > +    msg->type = VFIO_MSG_NONE;
> > +    msg->hdr = NULL;
> > +    msg->fds = NULL;
> > +    msg->complete = false;
> > +    QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
> > +}
> > +
> > +static VFIOUserFDs *vfio_user_getfds(int numfds)
> > +{
> > +    VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
> > +
> > +    fds->fds = (int *)((char *)fds + sizeof(*fds));
> > +
> > +    return fds;
> > +}
> > +
> >  /*
> >   * Functions only called by iothread
> >   */
> >
> > +static void vfio_user_recv(void *opaque)
> > +{
> > +    VFIOProxy *proxy = opaque;
> > +
> > +    QEMU_LOCK_GUARD(&proxy->lock);
> > +
> > +    if (proxy->state == VFIO_PROXY_CONNECTED) {
> > +        while (vfio_user_recv_one(proxy) == 0) {
> > +            ;
> > +        }
> > +    }
> > +}
> > +
> > +/*
> > + * Receive and process one incoming message.
> > + *
> > + * For replies, find matching outgoing request and wake any waiters.
> > + * For requests, queue in incoming list and run request BH.
> > + */
> > +static int vfio_user_recv_one(VFIOProxy *proxy)
> > +{
> > +    VFIOUserMsg *msg = NULL;
> > +    g_autofree int *fdp = NULL;
> > +    VFIOUserFDs *reqfds;
> > +    VFIOUserHdr hdr;
> > +    struct iovec iov = {
> > +        .iov_base = &hdr,
> > +        .iov_len = sizeof(hdr),
> > +    };
> > +    bool isreply = false;
> > +    int i, ret;
> > +    size_t msgleft, numfds = 0;
> > +    char *data = NULL;
> > +    char *buf = NULL;
> > +    Error *local_err = NULL;
> > +
> > +    /*
> > +     * Read header
> > +     */
> > +    ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds,
> > +                                 &local_err);
> > +    if (ret == QIO_CHANNEL_ERR_BLOCK) {
> > +        return ret;
> > +    }
> > +    if (ret <= 0) {
> > +        /* read error or other side closed connection */
> > +        if (ret == 0) {
> > +            error_setg(&local_err, "vfio_user_recv server closed socket");
> > +        } else {
> > +            error_prepend(&local_err, "vfio_user_recv");
> > +        }
> > +        goto fatal;
> > +    }
> > +    if (ret < sizeof(msg)) {
> > +        error_setg(&local_err, "vfio_user_recv short read of header");
> > +        goto fatal;
> > +    }
> > +
> > +    /*
> > +     * Validate header
> > +     */
> > +    if (hdr.size < sizeof(VFIOUserHdr)) {
> > +        error_setg(&local_err, "vfio_user_recv bad header size");
> > +        goto fatal;
> > +    }
> > +    switch (hdr.flags & VFIO_USER_TYPE) {
> > +    case VFIO_USER_REQUEST:
> > +        isreply = false;
> > +        break;
> > +    case VFIO_USER_REPLY:
> > +        isreply = true;
> > +        break;
> > +    default:
> > +        error_setg(&local_err, "vfio_user_recv unknown message type");
> > +        goto fatal;
> > +    }
> > +
> > +    /*
> > +     * For replies, find the matching pending request.
> > +     * For requests, reap incoming FDs.
> > +     */
> > +    if (isreply) {
> > +        QTAILQ_FOREACH(msg, &proxy->pending, next) {
> > +            if (hdr.id == msg->id) {
> > +                break;
> > +            }
> > +        }
> > +        if (msg == NULL) {
> > +            error_setg(&local_err, "vfio_user_recv unexpected reply");
> > +            goto err;
> > +        }
> > +        QTAILQ_REMOVE(&proxy->pending, msg, next);
> > +
> > +        /*
> > +         * Process any received FDs
> > +         */
> > +        if (numfds != 0) {
> > +            if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
> > +                error_setg(&local_err, "vfio_user_recv unexpected FDs");
> > +                goto err;
> > +            }
> > +            msg->fds->recv_fds = numfds;
> > +            memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
> > +        }
> > +    } else {
> > +        if (numfds != 0) {
> > +            reqfds = vfio_user_getfds(numfds);
> > +            memcpy(reqfds->fds, fdp, numfds * sizeof(int));
> > +        } else {
> > +            reqfds = NULL;
> > +        }
> > +    }
> > +
> > +    /*
> > +     * Put the whole message into a single buffer.
> > +     */
> > +    if (isreply) {
> > +        if (hdr.size > msg->rsize) {
> > +            error_setg(&local_err,
> > +                       "vfio_user_recv reply larger than recv buffer");
> > +            goto err;
> > +        }
> > +        *msg->hdr = hdr;
> > +        data = (char *)msg->hdr + sizeof(hdr);
> > +    } else {
> > +        if (hdr.size > max_xfer_size) {
> > +            error_setg(&local_err, "vfio_user_recv request larger than max");
> > +            goto err;
> > +        }
> > +        buf = g_malloc0(hdr.size);
> > +        memcpy(buf, &hdr, sizeof(hdr));
> > +        data = buf + sizeof(hdr);
> > +        msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
> > +        msg->type = VFIO_MSG_REQ;
> > +    }
> > +
> > +    msgleft = hdr.size - sizeof(hdr);
> > +    while (msgleft > 0) {
> > +        ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
> > +
> > +        /* error or would block */
> > +        if (ret < 0) {
> > +            goto fatal;
> > +        }
> 
> IIUC qio_channel_read() ends up calling qio_channel_socket_readv() which can
> return QIO_CHANNEL_ERR_BLOCK (-2). The if will be taken so local_err is NULL
> and that causes a segfault when error_report_err(local_err) is called before
> returning from this function.

In fact, don't we need to continue if qio_channel_read() returns QIO_CHANNEL_ERR_BLOCK and only fail if it returns -1?

> 
> > +
> > +        msgleft -= ret;
> > +        data += ret;
> > +    }
> > +
> > +    /*
> > +     * Replies signal a waiter, if none just check for errors
> > +     * and free the message buffer.
> > +     *
> > +     * Requests get queued for the BH.
> > +     */
> > +    if (isreply) {
> > +        msg->complete = true;
> > +        if (msg->type == VFIO_MSG_WAIT) {
> > +            qemu_cond_signal(&msg->cv);
> > +        } else {
> > +            if (hdr.flags & VFIO_USER_ERROR) {
> > +                error_printf("vfio_user_rcv error reply on async request ");
> > +                error_printf("command %x error %s\n", hdr.command,
> > +                             strerror(hdr.error_reply));
> > +            }
> > +            /* youngest nowait msg has been ack'd */
> > +            if (proxy->last_nowait == msg) {
> > +                proxy->last_nowait = NULL;
> > +            }
> > +            vfio_user_recycle(proxy, msg);
> > +        }
> > +    } else {
> > +        QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
> > +        qemu_bh_schedule(proxy->req_bh);
> > +    }
> > +    return 0;
> > +
> > +    /*
> > +     * fatal means the other side closed or we don't trust the stream
> > +     * err means this message is corrupt
> > +     */
> > +fatal:
> > +    vfio_user_shutdown(proxy);
> > +    proxy->state = VFIO_PROXY_ERROR;
> > +
> > +err:
> > +    for (i = 0; i < numfds; i++) {
> > +        close(fdp[i]);
> > +    }
> > +    if (isreply && msg != NULL) {
> > +        /* force an error to keep sending thread from hanging */
> > +        vfio_user_set_error(msg->hdr, EINVAL);
> > +        msg->complete = true;
> > +        qemu_cond_signal(&msg->cv);
> > +    }
> > +    error_report_err(local_err);
> > +    return -1;
> > +}
> > +
> >  static void vfio_user_cb(void *opaque)
> >  {
> >      VFIOProxy *proxy = opaque;
> > @@ -59,6 +326,51 @@ static void vfio_user_cb(void *opaque)
> >   * Functions called by main or CPU threads
> >   */
> >
> > +/*
> > + * Process incoming requests.
> > + *
> > + * The bus-specific callback has the form:
> > + *    request(opaque, msg)
> > + * where 'opaque' was specified in vfio_user_set_handler
> > + * and 'msg' is the inbound message.
> > + *
> > + * The callback is responsible for disposing of the message buffer,
> > + * usually by re-using it when calling vfio_send_reply or vfio_send_error,
> > + * both of which free their message buffer when the reply is sent.
> > + *
> > + * If the callback uses a new buffer, it needs to free the old one.
> > + */
> > +static void vfio_user_request(void *opaque)
> > +{
> > +    VFIOProxy *proxy = opaque;
> > +    VFIOUserMsgQ new, free;
> > +    VFIOUserMsg *msg, *m1;
> > +
> > +    /* reap all incoming */
> > +    QTAILQ_INIT(&new);
> > +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> > +        QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
> > +            QTAILQ_REMOVE(&proxy->pending, msg, next);
> > +            QTAILQ_INSERT_TAIL(&new, msg, next);
> > +        }
> > +    }
> > +
> > +    /* process list */
> > +    QTAILQ_INIT(&free);
> > +    QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
> > +        QTAILQ_REMOVE(&new, msg, next);
> > +        proxy->request(proxy->req_arg, msg);
> > +        QTAILQ_INSERT_HEAD(&free, msg, next);
> > +    }
> > +
> > +    /* free list */
> > +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> > +        QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
> > +            vfio_user_recycle(proxy, msg);
> > +        }
> > +    }
> > +}
> > +
> >  static QLIST_HEAD(, VFIOProxy) vfio_user_sockets =
> >      QLIST_HEAD_INITIALIZER(vfio_user_sockets);
> >
> > @@ -97,6 +409,7 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress
> > *addr, Error **errp)
> >      }
> >
> >      proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
> > +    proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
> >
> >      QTAILQ_INIT(&proxy->outgoing);
> >      QTAILQ_INIT(&proxy->incoming);
> > @@ -107,6 +420,18 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress
> > *addr, Error **errp)
> >      return proxy;
> >  }
> >
> > +void vfio_user_set_handler(VFIODevice *vbasedev,
> > +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> > +                           void *req_arg)
> > +{
> > +    VFIOProxy *proxy = vbasedev->proxy;
> > +
> > +    proxy->request = handler;
> > +    proxy->req_arg = req_arg;
> > +    qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
> > +                                   vfio_user_recv, NULL, proxy);
> > +}
> > +
> >  void vfio_user_disconnect(VFIOProxy *proxy)
> >  {
> >      VFIOUserMsg *r1, *r2;
> > @@ -122,6 +447,8 @@ void vfio_user_disconnect(VFIOProxy *proxy)
> >      }
> >      object_unref(OBJECT(proxy->ioc));
> >      proxy->ioc = NULL;
> > +    qemu_bh_delete(proxy->req_bh);
> > +    proxy->req_bh = NULL;
> >
> >      proxy->state = VFIO_PROXY_CLOSING;
> >      QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
> > diff --git a/MAINTAINERS b/MAINTAINERS
> > index cfaccbf..bc0ba88 100644
> > --- a/MAINTAINERS
> > +++ b/MAINTAINERS
> > @@ -1909,6 +1909,7 @@ S: Supported
> >  F: docs/devel/vfio-user.rst
> >  F: hw/vfio/user.c
> >  F: hw/vfio/user.h
> > +F: hw/vfio/user-protocol.h
> >
> >  vhost
> >  M: Michael S. Tsirkin <mst@redhat.com>
> > --
> > 1.8.3.1
> >
John Johnson Feb. 7, 2022, 7:07 a.m. UTC | #3
> On Feb 4, 2022, at 4:42 AM, Thanos Makatos <thanos.makatos@nutanix.com> wrote:
> 
>> -----Original Message-----
>> From: Qemu-devel <qemu-devel-
>> bounces+thanos.makatos=nutanix.com@nongnu.org> On Behalf Of Thanos
>> Makatos
>> Sent: 03 February 2022 21:54
>> To: John Johnson <john.g.johnson@oracle.com>; qemu-devel@nongnu.org
>> Subject: RE: [RFC v4 08/21] vfio-user: define socket receive functions
>> 
>> 
>> 
>>> -----Original Message-----
>>> From: Qemu-devel <qemu-devel-
>>> bounces+thanos.makatos=nutanix.com@nongnu.org> On Behalf Of John
>>> Johnson
>>> Sent: 12 January 2022 00:44
>>> To: qemu-devel@nongnu.org
>>> Subject: [RFC v4 08/21] vfio-user: define socket receive functions
>>> 
>>> +    }
>>> +
>>> +    msgleft = hdr.size - sizeof(hdr);
>>> +    while (msgleft > 0) {
>>> +        ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
>>> +
>>> +        /* error or would block */
>>> +        if (ret < 0) {
>>> +            goto fatal;
>>> +        }
>> 
>> IIUC qio_channel_read() ends up calling qio_channel_socket_readv() which can
>> return QIO_CHANNEL_ERR_BLOCK (-2). The if will be taken so local_err is NULL
>> and that causes a segfault when error_report_err(local_err) is called before
>> returning from this function.
> 
> In fact, don't we need to continue if qio_channel_read() returns QIO_CHANNEL_ERR_BLOCK and only fail if it returns -1?
> 
>> 
>>> +
>>> +        msgleft -= ret;
>>> +        data += ret;
>>> +    }
>>> +


	I can’t loop indefinitely, as a malicious server could cause the receiver to loop
continuously if it sends a packet with a header length greater than the packet length.

	If large messages are being fragmented by the socket code, then I think I’ll need
to change the packet parser to able to reassemble them.

									JJ
Thanos Makatos Feb. 15, 2022, 1:35 p.m. UTC | #4
> -----Original Message-----
> From: Qemu-devel <qemu-devel-
> bounces+thanos.makatos=nutanix.com@nongnu.org> On Behalf Of John
> Johnson
> Sent: 12 January 2022 00:44
> To: qemu-devel@nongnu.org
> Subject: [RFC v4 08/21] vfio-user: define socket receive functions
> 
> Add infrastructure needed to receive incoming messages
> 
> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> ---
>  hw/vfio/user-protocol.h |  54 ++++++++
>  hw/vfio/user.h          |   6 +
>  hw/vfio/pci.c           |   6 +
>  hw/vfio/user.c          | 327
> ++++++++++++++++++++++++++++++++++++++++++++++++
>  MAINTAINERS             |   1 +
>  5 files changed, 394 insertions(+)
>  create mode 100644 hw/vfio/user-protocol.h
> 
> diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h
> new file mode 100644
> index 0000000..d23877c
> --- /dev/null
> +++ b/hw/vfio/user-protocol.h
> @@ -0,0 +1,54 @@
> +#ifndef VFIO_USER_PROTOCOL_H
> +#define VFIO_USER_PROTOCOL_H
> +
> +/*
> + * vfio protocol over a UNIX socket.
> + *
> + * Copyright © 2018, 2021 Oracle and/or its affiliates.
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + * Each message has a standard header that describes the command
> + * being sent, which is almost always a VFIO ioctl().
> + *
> + * The header may be followed by command-specific data, such as the
> + * region and offset info for read and write commands.
> + */
> +
> +typedef struct {
> +    uint16_t id;
> +    uint16_t command;
> +    uint32_t size;
> +    uint32_t flags;
> +    uint32_t error_reply;
> +} VFIOUserHdr;
> +
> +/* VFIOUserHdr commands */
> +enum vfio_user_command {
> +    VFIO_USER_VERSION                   = 1,
> +    VFIO_USER_DMA_MAP                   = 2,
> +    VFIO_USER_DMA_UNMAP                 = 3,
> +    VFIO_USER_DEVICE_GET_INFO           = 4,
> +    VFIO_USER_DEVICE_GET_REGION_INFO    = 5,
> +    VFIO_USER_DEVICE_GET_REGION_IO_FDS  = 6,
> +    VFIO_USER_DEVICE_GET_IRQ_INFO       = 7,
> +    VFIO_USER_DEVICE_SET_IRQS           = 8,
> +    VFIO_USER_REGION_READ               = 9,
> +    VFIO_USER_REGION_WRITE              = 10,
> +    VFIO_USER_DMA_READ                  = 11,
> +    VFIO_USER_DMA_WRITE                 = 12,
> +    VFIO_USER_DEVICE_RESET              = 13,
> +    VFIO_USER_DIRTY_PAGES               = 14,
> +    VFIO_USER_MAX,
> +};
> +
> +/* VFIOUserHdr flags */
> +#define VFIO_USER_REQUEST       0x0
> +#define VFIO_USER_REPLY         0x1
> +#define VFIO_USER_TYPE          0xF
> +
> +#define VFIO_USER_NO_REPLY      0x10
> +#define VFIO_USER_ERROR         0x20
> +
> +#endif /* VFIO_USER_PROTOCOL_H */
> diff --git a/hw/vfio/user.h b/hw/vfio/user.h
> index da92862..72eefa7 100644
> --- a/hw/vfio/user.h
> +++ b/hw/vfio/user.h
> @@ -11,6 +11,8 @@
>   *
>   */
> 
> +#include "user-protocol.h"
> +
>  typedef struct {
>      int send_fds;
>      int recv_fds;
> @@ -27,6 +29,7 @@ enum msg_type {
> 
>  typedef struct VFIOUserMsg {
>      QTAILQ_ENTRY(VFIOUserMsg) next;
> +    VFIOUserHdr *hdr;
>      VFIOUserFDs *fds;
>      uint32_t rsize;
>      uint32_t id;
> @@ -74,5 +77,8 @@ typedef struct VFIOProxy {
> 
>  VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
>  void vfio_user_disconnect(VFIOProxy *proxy);
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> +                           void *reqarg);
> 
>  #endif /* VFIO_USER_H */
> diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
> index 9fd7c07..0de915d 100644
> --- a/hw/vfio/pci.c
> +++ b/hw/vfio/pci.c
> @@ -3386,6 +3386,11 @@ type_init(register_vfio_pci_dev_type)
>   * vfio-user routines.
>   */
> 
> +static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
> +{
> +
> +}
> +
>  /*
>   * Emulated devices don't use host hot reset
>   */
> @@ -3432,6 +3437,7 @@ static void vfio_user_pci_realize(PCIDevice *pdev,
> Error **errp)
>          return;
>      }
>      vbasedev->proxy = proxy;
> +    vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
> 
>      vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name);
>      vbasedev->dev = DEVICE(vdev);
> diff --git a/hw/vfio/user.c b/hw/vfio/user.c
> index c843f90..e1dfd5d 100644
> --- a/hw/vfio/user.c
> +++ b/hw/vfio/user.c
> @@ -25,10 +25,26 @@
>  #include "sysemu/iothread.h"
>  #include "user.h"
> 
> +static uint64_t max_xfer_size;
>  static IOThread *vfio_user_iothread;
> 
>  static void vfio_user_shutdown(VFIOProxy *proxy);
> +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
> +                                     VFIOUserFDs *fds);
> +static VFIOUserFDs *vfio_user_getfds(int numfds);
> +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg);
> 
> +static void vfio_user_recv(void *opaque);
> +static int vfio_user_recv_one(VFIOProxy *proxy);
> +static void vfio_user_cb(void *opaque);
> +
> +static void vfio_user_request(void *opaque);
> +
> +static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
> +{
> +    hdr->flags |= VFIO_USER_ERROR;
> +    hdr->error_reply = err;
> +}
> 
>  /*
>   * Functions called by main, CPU, or iothread threads
> @@ -40,10 +56,261 @@ static void vfio_user_shutdown(VFIOProxy *proxy)
>      qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL, NULL, NULL);
>  }
> 
> +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
> +                                     VFIOUserFDs *fds)
> +{
> +    VFIOUserMsg *msg;
> +
> +    msg = QTAILQ_FIRST(&proxy->free);
> +    if (msg != NULL) {
> +        QTAILQ_REMOVE(&proxy->free, msg, next);
> +    } else {
> +        msg = g_malloc0(sizeof(*msg));
> +        qemu_cond_init(&msg->cv);
> +    }
> +
> +    msg->hdr = hdr;
> +    msg->fds = fds;
> +    return msg;
> +}
> +
> +/*
> + * Recycle a message list entry to the free list.
> + */
> +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg)
> +{
> +    if (msg->type == VFIO_MSG_NONE) {
> +        error_printf("vfio_user_recycle - freeing free msg\n");
> +        return;
> +    }
> +
> +    /* free msg buffer if no one is waiting to consume the reply */
> +    if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
> +        g_free(msg->hdr);
> +        if (msg->fds != NULL) {
> +            g_free(msg->fds);
> +        }
> +    }
> +
> +    msg->type = VFIO_MSG_NONE;
> +    msg->hdr = NULL;
> +    msg->fds = NULL;
> +    msg->complete = false;
> +    QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
> +}
> +
> +static VFIOUserFDs *vfio_user_getfds(int numfds)
> +{
> +    VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
> +
> +    fds->fds = (int *)((char *)fds + sizeof(*fds));
> +
> +    return fds;
> +}
> +
>  /*
>   * Functions only called by iothread
>   */
> 
> +static void vfio_user_recv(void *opaque)
> +{
> +    VFIOProxy *proxy = opaque;
> +
> +    QEMU_LOCK_GUARD(&proxy->lock);
> +
> +    if (proxy->state == VFIO_PROXY_CONNECTED) {
> +        while (vfio_user_recv_one(proxy) == 0) {
> +            ;
> +        }
> +    }
> +}
> +
> +/*
> + * Receive and process one incoming message.
> + *
> + * For replies, find matching outgoing request and wake any waiters.
> + * For requests, queue in incoming list and run request BH.
> + */
> +static int vfio_user_recv_one(VFIOProxy *proxy)
> +{
> +    VFIOUserMsg *msg = NULL;
> +    g_autofree int *fdp = NULL;
> +    VFIOUserFDs *reqfds;
> +    VFIOUserHdr hdr;
> +    struct iovec iov = {
> +        .iov_base = &hdr,
> +        .iov_len = sizeof(hdr),
> +    };
> +    bool isreply = false;
> +    int i, ret;
> +    size_t msgleft, numfds = 0;
> +    char *data = NULL;
> +    char *buf = NULL;
> +    Error *local_err = NULL;
> +
> +    /*
> +     * Read header
> +     */
> +    ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds,
> +                                 &local_err);
> +    if (ret == QIO_CHANNEL_ERR_BLOCK) {
> +        return ret;
> +    }
> +    if (ret <= 0) {
> +        /* read error or other side closed connection */
> +        if (ret == 0) {
> +            error_setg(&local_err, "vfio_user_recv server closed socket");
> +        } else {
> +            error_prepend(&local_err, "vfio_user_recv");
> +        }
> +        goto fatal;
> +    }
> +    if (ret < sizeof(msg)) {
> +        error_setg(&local_err, "vfio_user_recv short read of header");
> +        goto fatal;
> +    }

Print received size for debug purposes?

> +
> +    /*
> +     * Validate header
> +     */
> +    if (hdr.size < sizeof(VFIOUserHdr)) {
> +        error_setg(&local_err, "vfio_user_recv bad header size");
> +        goto fatal;
> +    }

Print header size?

> +    switch (hdr.flags & VFIO_USER_TYPE) {
> +    case VFIO_USER_REQUEST:
> +        isreply = false;
> +        break;
> +    case VFIO_USER_REPLY:
> +        isreply = true;
> +        break;
> +    default:
> +        error_setg(&local_err, "vfio_user_recv unknown message type");
> +        goto fatal;
> +    }

Print message type?

> +
> +    /*
> +     * For replies, find the matching pending request.
> +     * For requests, reap incoming FDs.
> +     */
> +    if (isreply) {
> +        QTAILQ_FOREACH(msg, &proxy->pending, next) {
> +            if (hdr.id == msg->id) {
> +                break;
> +            }
> +        }
> +        if (msg == NULL) {
> +            error_setg(&local_err, "vfio_user_recv unexpected reply");
> +            goto err;
> +        }
> +        QTAILQ_REMOVE(&proxy->pending, msg, next);
> +
> +        /*
> +         * Process any received FDs
> +         */
> +        if (numfds != 0) {
> +            if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
> +                error_setg(&local_err, "vfio_user_recv unexpected FDs");
> +                goto err;
> +            }
> +            msg->fds->recv_fds = numfds;
> +            memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
> +        }
> +    } else {
> +        if (numfds != 0) {
> +            reqfds = vfio_user_getfds(numfds);
> +            memcpy(reqfds->fds, fdp, numfds * sizeof(int));
> +        } else {
> +            reqfds = NULL;
> +        }
> +    }
> +
> +    /*
> +     * Put the whole message into a single buffer.
> +     */
> +    if (isreply) {
> +        if (hdr.size > msg->rsize) {
> +            error_setg(&local_err,
> +                       "vfio_user_recv reply larger than recv buffer");
> +            goto err;
> +        }

Print hdr.size and msg->rsize?

> +        *msg->hdr = hdr;
> +        data = (char *)msg->hdr + sizeof(hdr);
> +    } else {
> +        if (hdr.size > max_xfer_size) {
> +            error_setg(&local_err, "vfio_user_recv request larger than max");
> +            goto err;
> +        }

Print hdr.size?

> +        buf = g_malloc0(hdr.size);
> +        memcpy(buf, &hdr, sizeof(hdr));
> +        data = buf + sizeof(hdr);
> +        msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
> +        msg->type = VFIO_MSG_REQ;
> +    }
> +
> +    msgleft = hdr.size - sizeof(hdr);
> +    while (msgleft > 0) {
> +        ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
> +
> +        /* error or would block */
> +        if (ret < 0) {
> +            goto fatal;
> +        }
> +
> +        msgleft -= ret;
> +        data += ret;
> +    }
> +
> +    /*
> +     * Replies signal a waiter, if none just check for errors
> +     * and free the message buffer.
> +     *
> +     * Requests get queued for the BH.
> +     */
> +    if (isreply) {
> +        msg->complete = true;
> +        if (msg->type == VFIO_MSG_WAIT) {
> +            qemu_cond_signal(&msg->cv);
> +        } else {
> +            if (hdr.flags & VFIO_USER_ERROR) {
> +                error_printf("vfio_user_rcv error reply on async request ");
> +                error_printf("command %x error %s\n", hdr.command,
> +                             strerror(hdr.error_reply));
> +            }
> +            /* youngest nowait msg has been ack'd */
> +            if (proxy->last_nowait == msg) {
> +                proxy->last_nowait = NULL;
> +            }
> +            vfio_user_recycle(proxy, msg);
> +        }
> +    } else {
> +        QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
> +        qemu_bh_schedule(proxy->req_bh);
> +    }
> +    return 0;
> +
> +    /*
> +     * fatal means the other side closed or we don't trust the stream
> +     * err means this message is corrupt
> +     */
> +fatal:
> +    vfio_user_shutdown(proxy);
> +    proxy->state = VFIO_PROXY_ERROR;
> +
> +err:
> +    for (i = 0; i < numfds; i++) {
> +        close(fdp[i]);
> +    }
> +    if (isreply && msg != NULL) {
> +        /* force an error to keep sending thread from hanging */
> +        vfio_user_set_error(msg->hdr, EINVAL);
> +        msg->complete = true;
> +        qemu_cond_signal(&msg->cv);
> +    }
> +    error_report_err(local_err);
> +    return -1;
> +}
> +
>  static void vfio_user_cb(void *opaque)
>  {
>      VFIOProxy *proxy = opaque;
> @@ -59,6 +326,51 @@ static void vfio_user_cb(void *opaque)
>   * Functions called by main or CPU threads
>   */
> 
> +/*
> + * Process incoming requests.
> + *
> + * The bus-specific callback has the form:
> + *    request(opaque, msg)
> + * where 'opaque' was specified in vfio_user_set_handler
> + * and 'msg' is the inbound message.
> + *
> + * The callback is responsible for disposing of the message buffer,
> + * usually by re-using it when calling vfio_send_reply or vfio_send_error,
> + * both of which free their message buffer when the reply is sent.
> + *
> + * If the callback uses a new buffer, it needs to free the old one.
> + */
> +static void vfio_user_request(void *opaque)
> +{
> +    VFIOProxy *proxy = opaque;
> +    VFIOUserMsgQ new, free;
> +    VFIOUserMsg *msg, *m1;
> +
> +    /* reap all incoming */
> +    QTAILQ_INIT(&new);
> +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> +        QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
> +            QTAILQ_REMOVE(&proxy->pending, msg, next);
> +            QTAILQ_INSERT_TAIL(&new, msg, next);
> +        }
> +    }
> +
> +    /* process list */
> +    QTAILQ_INIT(&free);
> +    QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
> +        QTAILQ_REMOVE(&new, msg, next);
> +        proxy->request(proxy->req_arg, msg);
> +        QTAILQ_INSERT_HEAD(&free, msg, next);
> +    }
> +
> +    /* free list */
> +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> +        QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
> +            vfio_user_recycle(proxy, msg);
> +        }
> +    }
> +}
> +
>  static QLIST_HEAD(, VFIOProxy) vfio_user_sockets =
>      QLIST_HEAD_INITIALIZER(vfio_user_sockets);
> 
> @@ -97,6 +409,7 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress
> *addr, Error **errp)
>      }
> 
>      proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
> +    proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
> 
>      QTAILQ_INIT(&proxy->outgoing);
>      QTAILQ_INIT(&proxy->incoming);
> @@ -107,6 +420,18 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress
> *addr, Error **errp)
>      return proxy;
>  }
> 
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> +                           void *req_arg)
> +{
> +    VFIOProxy *proxy = vbasedev->proxy;
> +
> +    proxy->request = handler;
> +    proxy->req_arg = req_arg;
> +    qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
> +                                   vfio_user_recv, NULL, proxy);
> +}
> +
>  void vfio_user_disconnect(VFIOProxy *proxy)
>  {
>      VFIOUserMsg *r1, *r2;
> @@ -122,6 +447,8 @@ void vfio_user_disconnect(VFIOProxy *proxy)
>      }
>      object_unref(OBJECT(proxy->ioc));
>      proxy->ioc = NULL;
> +    qemu_bh_delete(proxy->req_bh);
> +    proxy->req_bh = NULL;
> 
>      proxy->state = VFIO_PROXY_CLOSING;
>      QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
> diff --git a/MAINTAINERS b/MAINTAINERS
> index cfaccbf..bc0ba88 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -1909,6 +1909,7 @@ S: Supported
>  F: docs/devel/vfio-user.rst
>  F: hw/vfio/user.c
>  F: hw/vfio/user.h
> +F: hw/vfio/user-protocol.h
> 
>  vhost
>  M: Michael S. Tsirkin <mst@redhat.com>
> --
> 1.8.3.1
>
Thanos Makatos Feb. 15, 2022, 2:50 p.m. UTC | #5
> > +/*
> > + * Receive and process one incoming message.
> > + *
> > + * For replies, find matching outgoing request and wake any waiters.
> > + * For requests, queue in incoming list and run request BH.
> > + */
> > +static int vfio_user_recv_one(VFIOProxy *proxy)
> > +{
> > +    VFIOUserMsg *msg = NULL;
> > +    g_autofree int *fdp = NULL;
> > +    VFIOUserFDs *reqfds;
> > +    VFIOUserHdr hdr;
> > +    struct iovec iov = {
> > +        .iov_base = &hdr,
> > +        .iov_len = sizeof(hdr),
> > +    };
> > +    bool isreply = false;
> > +    int i, ret;
> > +    size_t msgleft, numfds = 0;
> > +    char *data = NULL;
> > +    char *buf = NULL;
> > +    Error *local_err = NULL;
> > +
> > +    /*
> > +     * Read header
> > +     */
> > +    ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds,
> > +                                 &local_err);
> > +    if (ret == QIO_CHANNEL_ERR_BLOCK) {
> > +        return ret;
> > +    }
> > +    if (ret <= 0) {
> > +        /* read error or other side closed connection */
> > +        if (ret == 0) {
> > +            error_setg(&local_err, "vfio_user_recv server closed socket");
> > +        } else {
> > +            error_prepend(&local_err, "vfio_user_recv");
> > +        }
> > +        goto fatal;
> > +    }
> > +    if (ret < sizeof(msg)) {
> > +        error_setg(&local_err, "vfio_user_recv short read of header");
> > +        goto fatal;
> > +    }
> 
> Print received size for debug purposes?
> 
> > +
> > +    /*
> > +     * Validate header
> > +     */
> > +    if (hdr.size < sizeof(VFIOUserHdr)) {
> > +        error_setg(&local_err, "vfio_user_recv bad header size");
> > +        goto fatal;
> > +    }
> 
> Print header size?
> 
> > +    switch (hdr.flags & VFIO_USER_TYPE) {
> > +    case VFIO_USER_REQUEST:
> > +        isreply = false;
> > +        break;
> > +    case VFIO_USER_REPLY:
> > +        isreply = true;
> > +        break;
> > +    default:
> > +        error_setg(&local_err, "vfio_user_recv unknown message type");
> > +        goto fatal;
> > +    }
> 
> Print message type?
> 
> > +
> > +    /*
> > +     * For replies, find the matching pending request.
> > +     * For requests, reap incoming FDs.
> > +     */
> > +    if (isreply) {
> > +        QTAILQ_FOREACH(msg, &proxy->pending, next) {
> > +            if (hdr.id == msg->id) {
> > +                break;
> > +            }
> > +        }
> > +        if (msg == NULL) {
> > +            error_setg(&local_err, "vfio_user_recv unexpected reply");
> > +            goto err;
> > +        }
> > +        QTAILQ_REMOVE(&proxy->pending, msg, next);
> > +
> > +        /*
> > +         * Process any received FDs
> > +         */
> > +        if (numfds != 0) {
> > +            if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
> > +                error_setg(&local_err, "vfio_user_recv unexpected FDs");
> > +                goto err;
> > +            }
> > +            msg->fds->recv_fds = numfds;
> > +            memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
> > +        }
> > +    } else {
> > +        if (numfds != 0) {
> > +            reqfds = vfio_user_getfds(numfds);
> > +            memcpy(reqfds->fds, fdp, numfds * sizeof(int));
> > +        } else {
> > +            reqfds = NULL;
> > +        }
> > +    }
> > +
> > +    /*
> > +     * Put the whole message into a single buffer.
> > +     */
> > +    if (isreply) {
> > +        if (hdr.size > msg->rsize) {
> > +            error_setg(&local_err,
> > +                       "vfio_user_recv reply larger than recv buffer");
> > +            goto err;
> > +        }
> 
> Print hdr.size and msg->rsize?
> 
> > +        *msg->hdr = hdr;
> > +        data = (char *)msg->hdr + sizeof(hdr);
> > +    } else {
> > +        if (hdr.size > max_xfer_size) {
> > +            error_setg(&local_err, "vfio_user_recv request larger than max");
> > +            goto err;
> > +        }
> 
> Print hdr.size?

On second thought, should we dump the entire header in case of such errors? If not by default then at least in debug builds?
John Johnson Feb. 16, 2022, 2:09 a.m. UTC | #6
> On Feb 15, 2022, at 6:50 AM, Thanos Makatos <thanos.makatos@nutanix.com> wrote:
> 
>>> 
> 
> On second thought, should we dump the entire header in case of such errors? If not by default then at least in debug builds?


	I was thinking of adding qemu tracepoints in the recv and send paths
for your other debug rfe.  Maybe I’ll add one set for the normal path that
prints an abbreviated header, and another set for the error case that prints
the whole header.  Would that work?

								JJ
Thanos Makatos Feb. 16, 2022, 9:31 a.m. UTC | #7
> -----Original Message-----
> From: John Johnson <john.g.johnson@oracle.com>
> Sent: 16 February 2022 02:10
> To: Thanos Makatos <thanos.makatos@nutanix.com>
> Cc: qemu-devel@nongnu.org
> Subject: Re: [RFC v4 08/21] vfio-user: define socket receive functions
> 
> 
> 
> > On Feb 15, 2022, at 6:50 AM, Thanos Makatos
> <thanos.makatos@nutanix.com> wrote:
> >
> >>>
> >
> > On second thought, should we dump the entire header in case of such errors?
> If not by default then at least in debug builds?
> 
> 
> 	I was thinking of adding qemu tracepoints in the recv and send paths
> for your other debug rfe.  Maybe I’ll add one set for the normal path that
> prints an abbreviated header, and another set for the error case that prints
> the whole header.  Would that work?

Yes that would be great.
diff mbox series

Patch

diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h
new file mode 100644
index 0000000..d23877c
--- /dev/null
+++ b/hw/vfio/user-protocol.h
@@ -0,0 +1,54 @@ 
+#ifndef VFIO_USER_PROTOCOL_H
+#define VFIO_USER_PROTOCOL_H
+
+/*
+ * vfio protocol over a UNIX socket.
+ *
+ * Copyright © 2018, 2021 Oracle and/or its affiliates.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ * Each message has a standard header that describes the command
+ * being sent, which is almost always a VFIO ioctl().
+ *
+ * The header may be followed by command-specific data, such as the
+ * region and offset info for read and write commands.
+ */
+
+typedef struct {
+    uint16_t id;
+    uint16_t command;
+    uint32_t size;
+    uint32_t flags;
+    uint32_t error_reply;
+} VFIOUserHdr;
+
+/* VFIOUserHdr commands */
+enum vfio_user_command {
+    VFIO_USER_VERSION                   = 1,
+    VFIO_USER_DMA_MAP                   = 2,
+    VFIO_USER_DMA_UNMAP                 = 3,
+    VFIO_USER_DEVICE_GET_INFO           = 4,
+    VFIO_USER_DEVICE_GET_REGION_INFO    = 5,
+    VFIO_USER_DEVICE_GET_REGION_IO_FDS  = 6,
+    VFIO_USER_DEVICE_GET_IRQ_INFO       = 7,
+    VFIO_USER_DEVICE_SET_IRQS           = 8,
+    VFIO_USER_REGION_READ               = 9,
+    VFIO_USER_REGION_WRITE              = 10,
+    VFIO_USER_DMA_READ                  = 11,
+    VFIO_USER_DMA_WRITE                 = 12,
+    VFIO_USER_DEVICE_RESET              = 13,
+    VFIO_USER_DIRTY_PAGES               = 14,
+    VFIO_USER_MAX,
+};
+
+/* VFIOUserHdr flags */
+#define VFIO_USER_REQUEST       0x0
+#define VFIO_USER_REPLY         0x1
+#define VFIO_USER_TYPE          0xF
+
+#define VFIO_USER_NO_REPLY      0x10
+#define VFIO_USER_ERROR         0x20
+
+#endif /* VFIO_USER_PROTOCOL_H */
diff --git a/hw/vfio/user.h b/hw/vfio/user.h
index da92862..72eefa7 100644
--- a/hw/vfio/user.h
+++ b/hw/vfio/user.h
@@ -11,6 +11,8 @@ 
  *
  */
 
+#include "user-protocol.h"
+
 typedef struct {
     int send_fds;
     int recv_fds;
@@ -27,6 +29,7 @@  enum msg_type {
 
 typedef struct VFIOUserMsg {
     QTAILQ_ENTRY(VFIOUserMsg) next;
+    VFIOUserHdr *hdr;
     VFIOUserFDs *fds;
     uint32_t rsize;
     uint32_t id;
@@ -74,5 +77,8 @@  typedef struct VFIOProxy {
 
 VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
 void vfio_user_disconnect(VFIOProxy *proxy);
+void vfio_user_set_handler(VFIODevice *vbasedev,
+                           void (*handler)(void *opaque, VFIOUserMsg *msg),
+                           void *reqarg);
 
 #endif /* VFIO_USER_H */
diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
index 9fd7c07..0de915d 100644
--- a/hw/vfio/pci.c
+++ b/hw/vfio/pci.c
@@ -3386,6 +3386,11 @@  type_init(register_vfio_pci_dev_type)
  * vfio-user routines.
  */
 
+static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
+{
+
+}
+
 /*
  * Emulated devices don't use host hot reset
  */
@@ -3432,6 +3437,7 @@  static void vfio_user_pci_realize(PCIDevice *pdev, Error **errp)
         return;
     }
     vbasedev->proxy = proxy;
+    vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
 
     vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name);
     vbasedev->dev = DEVICE(vdev);
diff --git a/hw/vfio/user.c b/hw/vfio/user.c
index c843f90..e1dfd5d 100644
--- a/hw/vfio/user.c
+++ b/hw/vfio/user.c
@@ -25,10 +25,26 @@ 
 #include "sysemu/iothread.h"
 #include "user.h"
 
+static uint64_t max_xfer_size;
 static IOThread *vfio_user_iothread;
 
 static void vfio_user_shutdown(VFIOProxy *proxy);
+static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
+                                     VFIOUserFDs *fds);
+static VFIOUserFDs *vfio_user_getfds(int numfds);
+static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg);
 
+static void vfio_user_recv(void *opaque);
+static int vfio_user_recv_one(VFIOProxy *proxy);
+static void vfio_user_cb(void *opaque);
+
+static void vfio_user_request(void *opaque);
+
+static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
+{
+    hdr->flags |= VFIO_USER_ERROR;
+    hdr->error_reply = err;
+}
 
 /*
  * Functions called by main, CPU, or iothread threads
@@ -40,10 +56,261 @@  static void vfio_user_shutdown(VFIOProxy *proxy)
     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL, NULL, NULL);
 }
 
+static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
+                                     VFIOUserFDs *fds)
+{
+    VFIOUserMsg *msg;
+
+    msg = QTAILQ_FIRST(&proxy->free);
+    if (msg != NULL) {
+        QTAILQ_REMOVE(&proxy->free, msg, next);
+    } else {
+        msg = g_malloc0(sizeof(*msg));
+        qemu_cond_init(&msg->cv);
+    }
+
+    msg->hdr = hdr;
+    msg->fds = fds;
+    return msg;
+}
+
+/*
+ * Recycle a message list entry to the free list.
+ */
+static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg)
+{
+    if (msg->type == VFIO_MSG_NONE) {
+        error_printf("vfio_user_recycle - freeing free msg\n");
+        return;
+    }
+
+    /* free msg buffer if no one is waiting to consume the reply */
+    if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
+        g_free(msg->hdr);
+        if (msg->fds != NULL) {
+            g_free(msg->fds);
+        }
+    }
+
+    msg->type = VFIO_MSG_NONE;
+    msg->hdr = NULL;
+    msg->fds = NULL;
+    msg->complete = false;
+    QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
+}
+
+static VFIOUserFDs *vfio_user_getfds(int numfds)
+{
+    VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
+
+    fds->fds = (int *)((char *)fds + sizeof(*fds));
+
+    return fds;
+}
+
 /*
  * Functions only called by iothread
  */
 
+static void vfio_user_recv(void *opaque)
+{
+    VFIOProxy *proxy = opaque;
+
+    QEMU_LOCK_GUARD(&proxy->lock);
+
+    if (proxy->state == VFIO_PROXY_CONNECTED) {
+        while (vfio_user_recv_one(proxy) == 0) {
+            ;
+        }
+    }
+}
+
+/*
+ * Receive and process one incoming message.
+ *
+ * For replies, find matching outgoing request and wake any waiters.
+ * For requests, queue in incoming list and run request BH.
+ */
+static int vfio_user_recv_one(VFIOProxy *proxy)
+{
+    VFIOUserMsg *msg = NULL;
+    g_autofree int *fdp = NULL;
+    VFIOUserFDs *reqfds;
+    VFIOUserHdr hdr;
+    struct iovec iov = {
+        .iov_base = &hdr,
+        .iov_len = sizeof(hdr),
+    };
+    bool isreply = false;
+    int i, ret;
+    size_t msgleft, numfds = 0;
+    char *data = NULL;
+    char *buf = NULL;
+    Error *local_err = NULL;
+
+    /*
+     * Read header
+     */
+    ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds,
+                                 &local_err);
+    if (ret == QIO_CHANNEL_ERR_BLOCK) {
+        return ret;
+    }
+    if (ret <= 0) {
+        /* read error or other side closed connection */
+        if (ret == 0) {
+            error_setg(&local_err, "vfio_user_recv server closed socket");
+        } else {
+            error_prepend(&local_err, "vfio_user_recv");
+        }
+        goto fatal;
+    }
+    if (ret < sizeof(msg)) {
+        error_setg(&local_err, "vfio_user_recv short read of header");
+        goto fatal;
+    }
+
+    /*
+     * Validate header
+     */
+    if (hdr.size < sizeof(VFIOUserHdr)) {
+        error_setg(&local_err, "vfio_user_recv bad header size");
+        goto fatal;
+    }
+    switch (hdr.flags & VFIO_USER_TYPE) {
+    case VFIO_USER_REQUEST:
+        isreply = false;
+        break;
+    case VFIO_USER_REPLY:
+        isreply = true;
+        break;
+    default:
+        error_setg(&local_err, "vfio_user_recv unknown message type");
+        goto fatal;
+    }
+
+    /*
+     * For replies, find the matching pending request.
+     * For requests, reap incoming FDs.
+     */
+    if (isreply) {
+        QTAILQ_FOREACH(msg, &proxy->pending, next) {
+            if (hdr.id == msg->id) {
+                break;
+            }
+        }
+        if (msg == NULL) {
+            error_setg(&local_err, "vfio_user_recv unexpected reply");
+            goto err;
+        }
+        QTAILQ_REMOVE(&proxy->pending, msg, next);
+
+        /*
+         * Process any received FDs
+         */
+        if (numfds != 0) {
+            if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
+                error_setg(&local_err, "vfio_user_recv unexpected FDs");
+                goto err;
+            }
+            msg->fds->recv_fds = numfds;
+            memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
+        }
+    } else {
+        if (numfds != 0) {
+            reqfds = vfio_user_getfds(numfds);
+            memcpy(reqfds->fds, fdp, numfds * sizeof(int));
+        } else {
+            reqfds = NULL;
+        }
+    }
+
+    /*
+     * Put the whole message into a single buffer.
+     */
+    if (isreply) {
+        if (hdr.size > msg->rsize) {
+            error_setg(&local_err,
+                       "vfio_user_recv reply larger than recv buffer");
+            goto err;
+        }
+        *msg->hdr = hdr;
+        data = (char *)msg->hdr + sizeof(hdr);
+    } else {
+        if (hdr.size > max_xfer_size) {
+            error_setg(&local_err, "vfio_user_recv request larger than max");
+            goto err;
+        }
+        buf = g_malloc0(hdr.size);
+        memcpy(buf, &hdr, sizeof(hdr));
+        data = buf + sizeof(hdr);
+        msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
+        msg->type = VFIO_MSG_REQ;
+    }
+
+    msgleft = hdr.size - sizeof(hdr);
+    while (msgleft > 0) {
+        ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
+
+        /* error or would block */
+        if (ret < 0) {
+            goto fatal;
+        }
+
+        msgleft -= ret;
+        data += ret;
+    }
+
+    /*
+     * Replies signal a waiter, if none just check for errors
+     * and free the message buffer.
+     *
+     * Requests get queued for the BH.
+     */
+    if (isreply) {
+        msg->complete = true;
+        if (msg->type == VFIO_MSG_WAIT) {
+            qemu_cond_signal(&msg->cv);
+        } else {
+            if (hdr.flags & VFIO_USER_ERROR) {
+                error_printf("vfio_user_rcv error reply on async request ");
+                error_printf("command %x error %s\n", hdr.command,
+                             strerror(hdr.error_reply));
+            }
+            /* youngest nowait msg has been ack'd */
+            if (proxy->last_nowait == msg) {
+                proxy->last_nowait = NULL;
+            }
+            vfio_user_recycle(proxy, msg);
+        }
+    } else {
+        QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
+        qemu_bh_schedule(proxy->req_bh);
+    }
+    return 0;
+
+    /*
+     * fatal means the other side closed or we don't trust the stream
+     * err means this message is corrupt
+     */
+fatal:
+    vfio_user_shutdown(proxy);
+    proxy->state = VFIO_PROXY_ERROR;
+
+err:
+    for (i = 0; i < numfds; i++) {
+        close(fdp[i]);
+    }
+    if (isreply && msg != NULL) {
+        /* force an error to keep sending thread from hanging */
+        vfio_user_set_error(msg->hdr, EINVAL);
+        msg->complete = true;
+        qemu_cond_signal(&msg->cv);
+    }
+    error_report_err(local_err);
+    return -1;
+}
+
 static void vfio_user_cb(void *opaque)
 {
     VFIOProxy *proxy = opaque;
@@ -59,6 +326,51 @@  static void vfio_user_cb(void *opaque)
  * Functions called by main or CPU threads
  */
 
+/*
+ * Process incoming requests.
+ *
+ * The bus-specific callback has the form:
+ *    request(opaque, msg)
+ * where 'opaque' was specified in vfio_user_set_handler
+ * and 'msg' is the inbound message.
+ *
+ * The callback is responsible for disposing of the message buffer,
+ * usually by re-using it when calling vfio_send_reply or vfio_send_error,
+ * both of which free their message buffer when the reply is sent.
+ *
+ * If the callback uses a new buffer, it needs to free the old one.
+ */
+static void vfio_user_request(void *opaque)
+{
+    VFIOProxy *proxy = opaque;
+    VFIOUserMsgQ new, free;
+    VFIOUserMsg *msg, *m1;
+
+    /* reap all incoming */
+    QTAILQ_INIT(&new);
+    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
+        QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
+            QTAILQ_REMOVE(&proxy->pending, msg, next);
+            QTAILQ_INSERT_TAIL(&new, msg, next);
+        }
+    }
+
+    /* process list */
+    QTAILQ_INIT(&free);
+    QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
+        QTAILQ_REMOVE(&new, msg, next);
+        proxy->request(proxy->req_arg, msg);
+        QTAILQ_INSERT_HEAD(&free, msg, next);
+    }
+
+    /* free list */
+    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
+        QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
+            vfio_user_recycle(proxy, msg);
+        }
+    }
+}
+
 static QLIST_HEAD(, VFIOProxy) vfio_user_sockets =
     QLIST_HEAD_INITIALIZER(vfio_user_sockets);
 
@@ -97,6 +409,7 @@  VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
     }
 
     proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
+    proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
 
     QTAILQ_INIT(&proxy->outgoing);
     QTAILQ_INIT(&proxy->incoming);
@@ -107,6 +420,18 @@  VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
     return proxy;
 }
 
+void vfio_user_set_handler(VFIODevice *vbasedev,
+                           void (*handler)(void *opaque, VFIOUserMsg *msg),
+                           void *req_arg)
+{
+    VFIOProxy *proxy = vbasedev->proxy;
+
+    proxy->request = handler;
+    proxy->req_arg = req_arg;
+    qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
+                                   vfio_user_recv, NULL, proxy);
+}
+
 void vfio_user_disconnect(VFIOProxy *proxy)
 {
     VFIOUserMsg *r1, *r2;
@@ -122,6 +447,8 @@  void vfio_user_disconnect(VFIOProxy *proxy)
     }
     object_unref(OBJECT(proxy->ioc));
     proxy->ioc = NULL;
+    qemu_bh_delete(proxy->req_bh);
+    proxy->req_bh = NULL;
 
     proxy->state = VFIO_PROXY_CLOSING;
     QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
diff --git a/MAINTAINERS b/MAINTAINERS
index cfaccbf..bc0ba88 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1909,6 +1909,7 @@  S: Supported
 F: docs/devel/vfio-user.rst
 F: hw/vfio/user.c
 F: hw/vfio/user.h
+F: hw/vfio/user-protocol.h
 
 vhost
 M: Michael S. Tsirkin <mst@redhat.com>