diff mbox series

[v1,08/24] vfio-user: define socket receive functions

Message ID 297c4b40b5f99c26ad78c7d2a729db436951b5f9.1667542066.git.john.g.johnson@oracle.com (mailing list archive)
State New, archived
Headers show
Series vfio-user client | expand

Commit Message

John Johnson Nov. 8, 2022, 11:13 p.m. UTC
Add infrastructure needed to receive incoming messages

Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
---
 MAINTAINERS             |   1 +
 hw/vfio/pci.c           |  11 ++
 hw/vfio/user-protocol.h |  54 +++++++
 hw/vfio/user.c          | 399 ++++++++++++++++++++++++++++++++++++++++++++++++
 hw/vfio/user.h          |   8 +
 5 files changed, 473 insertions(+)
 create mode 100644 hw/vfio/user-protocol.h

Comments

John Levon Dec. 9, 2022, 3:34 p.m. UTC | #1
On Tue, Nov 08, 2022 at 03:13:30PM -0800, John Johnson wrote:

> Add infrastructure needed to receive incoming messages
> 
> +static void vfio_user_process(VFIOProxy *proxy, VFIOUserMsg *msg, bool isreply)
> +{
> +
> +    /*
> +     * Replies signal a waiter, if none just check for errors
> +     * and free the message buffer.
> +     *
> +     * Requests get queued for the BH.
> +     */
> +    if (isreply) {
> +        msg->complete = true;
> +        if (msg->type == VFIO_MSG_WAIT) {
> +            qemu_cond_signal(&msg->cv);
> +        } else {
> +            if (msg->hdr->flags & VFIO_USER_ERROR) {
> +                error_printf("vfio_user_rcv:  error reply on async request ");

nit, s/vfio_user_rcv/vfio_user_process/ , or even __func__ ?

> +    error_prepend(&local_err, "vfio_user_recv: ");

nit, same here

regards
john
Cédric Le Goater Dec. 13, 2022, 10:45 a.m. UTC | #2
On 11/9/22 00:13, John Johnson wrote:
> Add infrastructure needed to receive incoming messages
> 
> Signed-off-by: John G Johnson <john.g.johnson@oracle.com>
> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> ---
>   MAINTAINERS             |   1 +
>   hw/vfio/pci.c           |  11 ++
>   hw/vfio/user-protocol.h |  54 +++++++
>   hw/vfio/user.c          | 399 ++++++++++++++++++++++++++++++++++++++++++++++++
>   hw/vfio/user.h          |   8 +
>   5 files changed, 473 insertions(+)
>   create mode 100644 hw/vfio/user-protocol.h>
> diff --git a/MAINTAINERS b/MAINTAINERS
> index 5d64d02..b6c186b 100644
> --- a/MAINTAINERS
> +++ b/MAINTAINERS
> @@ -1993,6 +1993,7 @@ S: Supported
>   F: docs/devel/vfio-user.rst
>   F: hw/vfio/user.c
>   F: hw/vfio/user.h
> +F: hw/vfio/user-protocol.h
>   
>   vhost
>   M: Michael S. Tsirkin <mst@redhat.com>
> diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
> index e5f2413..f086235 100644
> --- a/hw/vfio/pci.c
> +++ b/hw/vfio/pci.c
> @@ -3432,6 +3432,16 @@ type_init(register_vfio_pci_dev_type)
>    */
>   
>   /*
> + * Incoming request message callback.
> + *
> + * Runs off main loop, so BQL held.
> + */
> +static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
> +{
> +
> +}
> +
> +/*
>    * Emulated devices don't use host hot reset
>    */
>   static void vfio_user_compute_needs_reset(VFIODevice *vbasedev)
> @@ -3477,6 +3487,7 @@ static void vfio_user_pci_realize(PCIDevice *pdev, Error **errp)
>           return;
>       }
>       vbasedev->proxy = proxy;
> +    vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
>   
>       vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name);
>       vbasedev->ops = &vfio_user_pci_ops;
> diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h
> new file mode 100644
> index 0000000..d23877c
> --- /dev/null
> +++ b/hw/vfio/user-protocol.h
> @@ -0,0 +1,54 @@
> +#ifndef VFIO_USER_PROTOCOL_H
> +#define VFIO_USER_PROTOCOL_H
> +
> +/*
> + * vfio protocol over a UNIX socket.
> + *
> + * Copyright © 2018, 2021 Oracle and/or its affiliates.
> + *
> + * This work is licensed under the terms of the GNU GPL, version 2.  See
> + * the COPYING file in the top-level directory.
> + *
> + * Each message has a standard header that describes the command
> + * being sent, which is almost always a VFIO ioctl().
> + *
> + * The header may be followed by command-specific data, such as the
> + * region and offset info for read and write commands.
> + */
> +
> +typedef struct {
> +    uint16_t id;
> +    uint16_t command;
> +    uint32_t size;
> +    uint32_t flags;
> +    uint32_t error_reply;
> +} VFIOUserHdr;
> +
> +/* VFIOUserHdr commands */
> +enum vfio_user_command {
> +    VFIO_USER_VERSION                   = 1,
> +    VFIO_USER_DMA_MAP                   = 2,
> +    VFIO_USER_DMA_UNMAP                 = 3,
> +    VFIO_USER_DEVICE_GET_INFO           = 4,
> +    VFIO_USER_DEVICE_GET_REGION_INFO    = 5,
> +    VFIO_USER_DEVICE_GET_REGION_IO_FDS  = 6,
> +    VFIO_USER_DEVICE_GET_IRQ_INFO       = 7,
> +    VFIO_USER_DEVICE_SET_IRQS           = 8,
> +    VFIO_USER_REGION_READ               = 9,
> +    VFIO_USER_REGION_WRITE              = 10,
> +    VFIO_USER_DMA_READ                  = 11,
> +    VFIO_USER_DMA_WRITE                 = 12,
> +    VFIO_USER_DEVICE_RESET              = 13,
> +    VFIO_USER_DIRTY_PAGES               = 14,
> +    VFIO_USER_MAX,
> +};
> +
> +/* VFIOUserHdr flags */
> +#define VFIO_USER_REQUEST       0x0
> +#define VFIO_USER_REPLY         0x1
> +#define VFIO_USER_TYPE          0xF
> +
> +#define VFIO_USER_NO_REPLY      0x10
> +#define VFIO_USER_ERROR         0x20
> +
> +#endif /* VFIO_USER_PROTOCOL_H */
> diff --git a/hw/vfio/user.c b/hw/vfio/user.c
> index 4f09060..ffd69b9 100644
> --- a/hw/vfio/user.c
> +++ b/hw/vfio/user.c
> @@ -28,7 +28,22 @@
>   static IOThread *vfio_user_iothread;
>   
>   static void vfio_user_shutdown(VFIOProxy *proxy);
> +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
> +                                     VFIOUserFDs *fds);
> +static VFIOUserFDs *vfio_user_getfds(int numfds);
> +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg);
>   
> +static void vfio_user_recv(void *opaque);
> +static int vfio_user_recv_one(VFIOProxy *proxy);
> +static void vfio_user_cb(void *opaque);
> +
> +static void vfio_user_request(void *opaque);
> +
> +static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
> +{
> +    hdr->flags |= VFIO_USER_ERROR;
> +    hdr->error_reply = err;
> +}
>   
>   /*
>    * Functions called by main, CPU, or iothread threads
> @@ -40,10 +55,334 @@ static void vfio_user_shutdown(VFIOProxy *proxy)
>       qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL, NULL, NULL);
>   }
>   
> +static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
> +                                     VFIOUserFDs *fds)
> +{
> +    VFIOUserMsg *msg;
> +
> +    msg = QTAILQ_FIRST(&proxy->free);
> +    if (msg != NULL) {
> +        QTAILQ_REMOVE(&proxy->free, msg, next);
> +    } else {
> +        msg = g_malloc0(sizeof(*msg));
> +        qemu_cond_init(&msg->cv);
> +    }
> +
> +    msg->hdr = hdr;
> +    msg->fds = fds;
> +    return msg;
> +}
> +
> +/*
> + * Recycle a message list entry to the free list.
> + */
> +static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg)
> +{
> +    if (msg->type == VFIO_MSG_NONE) {
> +        error_printf("vfio_user_recycle - freeing free msg\n");
> +        return;
> +    }
> +
> +    /* free msg buffer if no one is waiting to consume the reply */
> +    if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
> +        g_free(msg->hdr);
> +        if (msg->fds != NULL) {
> +            g_free(msg->fds);
> +        }
> +    }
> +
> +    msg->type = VFIO_MSG_NONE;
> +    msg->hdr = NULL;
> +    msg->fds = NULL;
> +    msg->complete = false;
> +    QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
> +}
> +
> +static VFIOUserFDs *vfio_user_getfds(int numfds)
> +{
> +    VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
> +
> +    fds->fds = (int *)((char *)fds + sizeof(*fds));

may be change the fds field of VFIOUserFDs struct to fds[];

> +
> +    return fds;
> +}
> +
>   /*
>    * Functions only called by iothread
>    */
>   
> +/*
> + * Process a received message.
> + */
> +static void vfio_user_process(VFIOProxy *proxy, VFIOUserMsg *msg, bool isreply)
> +{
> +
> +    /*
> +     * Replies signal a waiter, if none just check for errors
> +     * and free the message buffer.
> +     *
> +     * Requests get queued for the BH.
> +     */
> +    if (isreply) {
> +        msg->complete = true;
> +        if (msg->type == VFIO_MSG_WAIT) {
> +            qemu_cond_signal(&msg->cv);
> +        } else {
> +            if (msg->hdr->flags & VFIO_USER_ERROR) {
> +                error_printf("vfio_user_rcv:  error reply on async request ");
> +                error_printf("command %x error %s\n", msg->hdr->command,
> +                             strerror(msg->hdr->error_reply));
> +            }
> +            /* youngest nowait msg has been ack'd */
> +            if (proxy->last_nowait == msg) {
> +                proxy->last_nowait = NULL;
> +            }
> +            vfio_user_recycle(proxy, msg);
> +        }
> +    } else {
> +        QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
> +        qemu_bh_schedule(proxy->req_bh);
> +    }
> +}
> +
> +/*
> + * Complete a partial message read
> + */
> +static int vfio_user_complete(VFIOProxy *proxy, Error **errp)
> +{
> +    VFIOUserMsg *msg = proxy->part_recv;
> +    size_t msgleft = proxy->recv_left;
> +    bool isreply;
> +    char *data;
> +    int ret;
> +
> +    data = (char *)msg->hdr + (msg->hdr->size - msgleft);
> +    while (msgleft > 0) {
> +        ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
> +
> +        /* error or would block */
> +        if (ret <= 0) {
> +            /* try for rest on next iternation */
> +            if (ret == QIO_CHANNEL_ERR_BLOCK) {
> +                proxy->recv_left = msgleft;
> +            }
> +            return ret;
> +        }
> +
> +        msgleft -= ret;
> +        data += ret;
> +    }
> +
> +    /*
> +     * Read complete message, process it.
> +     */
> +    proxy->part_recv = NULL;
> +    proxy->recv_left = 0;
> +    isreply = (msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REPLY;
> +    vfio_user_process(proxy, msg, isreply);
> +
> +    /* return positive value */
> +    return 1;
> +}
> +
> +static void vfio_user_recv(void *opaque)
> +{
> +    VFIOProxy *proxy = opaque;
> +
> +    QEMU_LOCK_GUARD(&proxy->lock);
> +
> +    if (proxy->state == VFIO_PROXY_CONNECTED) {
> +        while (vfio_user_recv_one(proxy) == 0) {
> +            ;
> +        }
> +    }
> +}
> +
> +/*
> + * Receive and process one incoming message.
> + *
> + * For replies, find matching outgoing request and wake any waiters.
> + * For requests, queue in incoming list and run request BH.
> + */
> +static int vfio_user_recv_one(VFIOProxy *proxy)
> +{
> +    VFIOUserMsg *msg = NULL;
> +    g_autofree int *fdp = NULL;
> +    VFIOUserFDs *reqfds;
> +    VFIOUserHdr hdr;
> +    struct iovec iov = {
> +        .iov_base = &hdr,
> +        .iov_len = sizeof(hdr),
> +    };
> +    bool isreply = false;
> +    int i, ret;
> +    size_t msgleft, numfds = 0;
> +    char *data = NULL;
> +    char *buf = NULL;
> +    Error *local_err = NULL;
> +
> +    /*
> +     * Complete any partial reads
> +     */
> +    if (proxy->part_recv != NULL) {
> +        ret = vfio_user_complete(proxy, &local_err);
> +
> +        /* still not complete, try later */
> +        if (ret == QIO_CHANNEL_ERR_BLOCK) {
> +            return ret;
> +        }
> +
> +        if (ret <= 0) {
> +            goto fatal;
> +        }
> +        /* else fall into reading another msg */
> +    }
> +
> +    /*
> +     * Read header
> +     */
> +    ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds,
> +                                 &local_err);
> +    if (ret == QIO_CHANNEL_ERR_BLOCK) {
> +        return ret;
> +    }
> +
> +    /* read error or other side closed connection */
> +    if (ret <= 0) {
> +        goto fatal;
> +    }
> +
> +    if (ret < sizeof(msg)) {
> +        error_setg(&local_err, "short read of header");
> +        goto fatal;
> +    }
> +
> +    /*
> +     * Validate header
> +     */
> +    if (hdr.size < sizeof(VFIOUserHdr)) {
> +        error_setg(&local_err, "bad header size");
> +        goto fatal;
> +    }
> +    switch (hdr.flags & VFIO_USER_TYPE) {
> +    case VFIO_USER_REQUEST:
> +        isreply = false;
> +        break;
> +    case VFIO_USER_REPLY:
> +        isreply = true;
> +        break;
> +    default:
> +        error_setg(&local_err, "unknown message type");
> +        goto fatal;
> +    }
> +
> +    /*
> +     * For replies, find the matching pending request.
> +     * For requests, reap incoming FDs.
> +     */
> +    if (isreply) {
> +        QTAILQ_FOREACH(msg, &proxy->pending, next) {
> +            if (hdr.id == msg->id) {
> +                break;
> +            }
> +        }
> +        if (msg == NULL) {
> +            error_setg(&local_err, "unexpected reply");
> +            goto err;
> +        }
> +        QTAILQ_REMOVE(&proxy->pending, msg, next);
> +
> +        /*
> +         * Process any received FDs
> +         */
> +        if (numfds != 0) {
> +            if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
> +                error_setg(&local_err, "unexpected FDs");
> +                goto err;
> +            }
> +            msg->fds->recv_fds = numfds;
> +            memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
> +        }
> +    } else {
> +        if (numfds != 0) {
> +            reqfds = vfio_user_getfds(numfds);
> +            memcpy(reqfds->fds, fdp, numfds * sizeof(int));
> +        } else {
> +            reqfds = NULL;
> +        }
> +    }

Looks good. Pity we can not use the chardev/char-socket.c code.

Thanks,

C.

> +    /*
> +     * Put the whole message into a single buffer.
> +     */
> +    if (isreply) {
> +        if (hdr.size > msg->rsize) {
> +            error_setg(&local_err, "reply larger than recv buffer");
> +            goto err;
> +        }
> +        *msg->hdr = hdr;
> +        data = (char *)msg->hdr + sizeof(hdr);
> +    } else {
> +        buf = g_malloc0(hdr.size);
> +        memcpy(buf, &hdr, sizeof(hdr));
> +        data = buf + sizeof(hdr);
> +        msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
> +        msg->type = VFIO_MSG_REQ;
> +    }
> +
> +    /*
> +     * Read rest of message.
> +     */
> +    msgleft = hdr.size - sizeof(hdr);
> +    while (msgleft > 0) {
> +        ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
> +
> +        /* prepare to complete read on next iternation */
> +        if (ret == QIO_CHANNEL_ERR_BLOCK) {
> +            proxy->part_recv = msg;
> +            proxy->recv_left = msgleft;
> +            return ret;
> +        }
> +
> +        if (ret <= 0) {
> +            goto fatal;
> +        }
> +
> +        msgleft -= ret;
> +        data += ret;
> +    }
> +
> +    vfio_user_process(proxy, msg, isreply);
> +    return 0;
> +
> +    /*
> +     * fatal means the other side closed or we don't trust the stream
> +     * err means this message is corrupt
> +     */
> +fatal:
> +    vfio_user_shutdown(proxy);
> +    proxy->state = VFIO_PROXY_ERROR;
> +
> +    /* set error if server side closed */
> +    if (ret == 0) {
> +        error_setg(&local_err, "server closed socket");
> +    }
> +
> +err:
> +    for (i = 0; i < numfds; i++) {
> +        close(fdp[i]);
> +    }
> +    if (isreply && msg != NULL) {
> +        /* force an error to keep sending thread from hanging */
> +        vfio_user_set_error(msg->hdr, EINVAL);
> +        msg->complete = true;
> +        qemu_cond_signal(&msg->cv);
> +    }
> +    error_prepend(&local_err, "vfio_user_recv: ");
> +    error_report_err(local_err);
> +    return -1;
> +}
> +
>   static void vfio_user_cb(void *opaque)
>   {
>       VFIOProxy *proxy = opaque;
> @@ -59,6 +398,51 @@ static void vfio_user_cb(void *opaque)
>    * Functions called by main or CPU threads
>    */
>   
> +/*
> + * Process incoming requests.
> + *
> + * The bus-specific callback has the form:
> + *    request(opaque, msg)
> + * where 'opaque' was specified in vfio_user_set_handler
> + * and 'msg' is the inbound message.
> + *
> + * The callback is responsible for disposing of the message buffer,
> + * usually by re-using it when calling vfio_send_reply or vfio_send_error,
> + * both of which free their message buffer when the reply is sent.
> + *
> + * If the callback uses a new buffer, it needs to free the old one.
> + */
> +static void vfio_user_request(void *opaque)
> +{
> +    VFIOProxy *proxy = opaque;
> +    VFIOUserMsgQ new, free;
> +    VFIOUserMsg *msg, *m1;
> +
> +    /* reap all incoming */
> +    QTAILQ_INIT(&new);
> +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> +        QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
> +            QTAILQ_REMOVE(&proxy->incoming, msg, next);
> +            QTAILQ_INSERT_TAIL(&new, msg, next);
> +        }
> +    }
> +
> +    /* process list */
> +    QTAILQ_INIT(&free);
> +    QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
> +        QTAILQ_REMOVE(&new, msg, next);
> +        proxy->request(proxy->req_arg, msg);
> +        QTAILQ_INSERT_HEAD(&free, msg, next);
> +    }
> +
> +    /* free list */
> +    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> +        QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
> +            vfio_user_recycle(proxy, msg);
> +        }
> +    }
> +}
> +
>   static QLIST_HEAD(, VFIOProxy) vfio_user_sockets =
>       QLIST_HEAD_INITIALIZER(vfio_user_sockets);
>   
> @@ -97,6 +481,7 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
>       }
>   
>       proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
> +    proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
>   
>       QTAILQ_INIT(&proxy->outgoing);
>       QTAILQ_INIT(&proxy->incoming);
> @@ -107,6 +492,18 @@ VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
>       return proxy;
>   }
>   
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> +                           void *req_arg)
> +{
> +    VFIOProxy *proxy = vbasedev->proxy;
> +
> +    proxy->request = handler;
> +    proxy->req_arg = req_arg;
> +    qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
> +                                   vfio_user_recv, NULL, proxy);
> +}
> +
>   void vfio_user_disconnect(VFIOProxy *proxy)
>   {
>       VFIOUserMsg *r1, *r2;
> @@ -122,6 +519,8 @@ void vfio_user_disconnect(VFIOProxy *proxy)
>       }
>       object_unref(OBJECT(proxy->ioc));
>       proxy->ioc = NULL;
> +    qemu_bh_delete(proxy->req_bh);
> +    proxy->req_bh = NULL;
>   
>       proxy->state = VFIO_PROXY_CLOSING;
>       QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
> diff --git a/hw/vfio/user.h b/hw/vfio/user.h
> index da92862..68a1080 100644
> --- a/hw/vfio/user.h
> +++ b/hw/vfio/user.h
> @@ -11,6 +11,8 @@
>    *
>    */
>   
> +#include "user-protocol.h"
> +
>   typedef struct {
>       int send_fds;
>       int recv_fds;
> @@ -27,6 +29,7 @@ enum msg_type {
>   
>   typedef struct VFIOUserMsg {
>       QTAILQ_ENTRY(VFIOUserMsg) next;
> +    VFIOUserHdr *hdr;
>       VFIOUserFDs *fds;
>       uint32_t rsize;
>       uint32_t id;
> @@ -66,6 +69,8 @@ typedef struct VFIOProxy {
>       VFIOUserMsgQ incoming;
>       VFIOUserMsgQ outgoing;
>       VFIOUserMsg *last_nowait;
> +    VFIOUserMsg *part_recv;
> +    size_t recv_left;
>       enum proxy_state state;
>   } VFIOProxy;
>   
> @@ -74,5 +79,8 @@ typedef struct VFIOProxy {
>   
>   VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
>   void vfio_user_disconnect(VFIOProxy *proxy);
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> +                           void (*handler)(void *opaque, VFIOUserMsg *msg),
> +                           void *reqarg);
>   
>   #endif /* VFIO_USER_H */
diff mbox series

Patch

diff --git a/MAINTAINERS b/MAINTAINERS
index 5d64d02..b6c186b 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -1993,6 +1993,7 @@  S: Supported
 F: docs/devel/vfio-user.rst
 F: hw/vfio/user.c
 F: hw/vfio/user.h
+F: hw/vfio/user-protocol.h
 
 vhost
 M: Michael S. Tsirkin <mst@redhat.com>
diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
index e5f2413..f086235 100644
--- a/hw/vfio/pci.c
+++ b/hw/vfio/pci.c
@@ -3432,6 +3432,16 @@  type_init(register_vfio_pci_dev_type)
  */
 
 /*
+ * Incoming request message callback.
+ *
+ * Runs off main loop, so BQL held.
+ */
+static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
+{
+
+}
+
+/*
  * Emulated devices don't use host hot reset
  */
 static void vfio_user_compute_needs_reset(VFIODevice *vbasedev)
@@ -3477,6 +3487,7 @@  static void vfio_user_pci_realize(PCIDevice *pdev, Error **errp)
         return;
     }
     vbasedev->proxy = proxy;
+    vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
 
     vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name);
     vbasedev->ops = &vfio_user_pci_ops;
diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h
new file mode 100644
index 0000000..d23877c
--- /dev/null
+++ b/hw/vfio/user-protocol.h
@@ -0,0 +1,54 @@ 
+#ifndef VFIO_USER_PROTOCOL_H
+#define VFIO_USER_PROTOCOL_H
+
+/*
+ * vfio protocol over a UNIX socket.
+ *
+ * Copyright © 2018, 2021 Oracle and/or its affiliates.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2.  See
+ * the COPYING file in the top-level directory.
+ *
+ * Each message has a standard header that describes the command
+ * being sent, which is almost always a VFIO ioctl().
+ *
+ * The header may be followed by command-specific data, such as the
+ * region and offset info for read and write commands.
+ */
+
+typedef struct {
+    uint16_t id;
+    uint16_t command;
+    uint32_t size;
+    uint32_t flags;
+    uint32_t error_reply;
+} VFIOUserHdr;
+
+/* VFIOUserHdr commands */
+enum vfio_user_command {
+    VFIO_USER_VERSION                   = 1,
+    VFIO_USER_DMA_MAP                   = 2,
+    VFIO_USER_DMA_UNMAP                 = 3,
+    VFIO_USER_DEVICE_GET_INFO           = 4,
+    VFIO_USER_DEVICE_GET_REGION_INFO    = 5,
+    VFIO_USER_DEVICE_GET_REGION_IO_FDS  = 6,
+    VFIO_USER_DEVICE_GET_IRQ_INFO       = 7,
+    VFIO_USER_DEVICE_SET_IRQS           = 8,
+    VFIO_USER_REGION_READ               = 9,
+    VFIO_USER_REGION_WRITE              = 10,
+    VFIO_USER_DMA_READ                  = 11,
+    VFIO_USER_DMA_WRITE                 = 12,
+    VFIO_USER_DEVICE_RESET              = 13,
+    VFIO_USER_DIRTY_PAGES               = 14,
+    VFIO_USER_MAX,
+};
+
+/* VFIOUserHdr flags */
+#define VFIO_USER_REQUEST       0x0
+#define VFIO_USER_REPLY         0x1
+#define VFIO_USER_TYPE          0xF
+
+#define VFIO_USER_NO_REPLY      0x10
+#define VFIO_USER_ERROR         0x20
+
+#endif /* VFIO_USER_PROTOCOL_H */
diff --git a/hw/vfio/user.c b/hw/vfio/user.c
index 4f09060..ffd69b9 100644
--- a/hw/vfio/user.c
+++ b/hw/vfio/user.c
@@ -28,7 +28,22 @@ 
 static IOThread *vfio_user_iothread;
 
 static void vfio_user_shutdown(VFIOProxy *proxy);
+static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
+                                     VFIOUserFDs *fds);
+static VFIOUserFDs *vfio_user_getfds(int numfds);
+static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg);
 
+static void vfio_user_recv(void *opaque);
+static int vfio_user_recv_one(VFIOProxy *proxy);
+static void vfio_user_cb(void *opaque);
+
+static void vfio_user_request(void *opaque);
+
+static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
+{
+    hdr->flags |= VFIO_USER_ERROR;
+    hdr->error_reply = err;
+}
 
 /*
  * Functions called by main, CPU, or iothread threads
@@ -40,10 +55,334 @@  static void vfio_user_shutdown(VFIOProxy *proxy)
     qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx, NULL, NULL, NULL);
 }
 
+static VFIOUserMsg *vfio_user_getmsg(VFIOProxy *proxy, VFIOUserHdr *hdr,
+                                     VFIOUserFDs *fds)
+{
+    VFIOUserMsg *msg;
+
+    msg = QTAILQ_FIRST(&proxy->free);
+    if (msg != NULL) {
+        QTAILQ_REMOVE(&proxy->free, msg, next);
+    } else {
+        msg = g_malloc0(sizeof(*msg));
+        qemu_cond_init(&msg->cv);
+    }
+
+    msg->hdr = hdr;
+    msg->fds = fds;
+    return msg;
+}
+
+/*
+ * Recycle a message list entry to the free list.
+ */
+static void vfio_user_recycle(VFIOProxy *proxy, VFIOUserMsg *msg)
+{
+    if (msg->type == VFIO_MSG_NONE) {
+        error_printf("vfio_user_recycle - freeing free msg\n");
+        return;
+    }
+
+    /* free msg buffer if no one is waiting to consume the reply */
+    if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
+        g_free(msg->hdr);
+        if (msg->fds != NULL) {
+            g_free(msg->fds);
+        }
+    }
+
+    msg->type = VFIO_MSG_NONE;
+    msg->hdr = NULL;
+    msg->fds = NULL;
+    msg->complete = false;
+    QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
+}
+
+static VFIOUserFDs *vfio_user_getfds(int numfds)
+{
+    VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
+
+    fds->fds = (int *)((char *)fds + sizeof(*fds));
+
+    return fds;
+}
+
 /*
  * Functions only called by iothread
  */
 
+/*
+ * Process a received message.
+ */
+static void vfio_user_process(VFIOProxy *proxy, VFIOUserMsg *msg, bool isreply)
+{
+
+    /*
+     * Replies signal a waiter, if none just check for errors
+     * and free the message buffer.
+     *
+     * Requests get queued for the BH.
+     */
+    if (isreply) {
+        msg->complete = true;
+        if (msg->type == VFIO_MSG_WAIT) {
+            qemu_cond_signal(&msg->cv);
+        } else {
+            if (msg->hdr->flags & VFIO_USER_ERROR) {
+                error_printf("vfio_user_rcv:  error reply on async request ");
+                error_printf("command %x error %s\n", msg->hdr->command,
+                             strerror(msg->hdr->error_reply));
+            }
+            /* youngest nowait msg has been ack'd */
+            if (proxy->last_nowait == msg) {
+                proxy->last_nowait = NULL;
+            }
+            vfio_user_recycle(proxy, msg);
+        }
+    } else {
+        QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
+        qemu_bh_schedule(proxy->req_bh);
+    }
+}
+
+/*
+ * Complete a partial message read
+ */
+static int vfio_user_complete(VFIOProxy *proxy, Error **errp)
+{
+    VFIOUserMsg *msg = proxy->part_recv;
+    size_t msgleft = proxy->recv_left;
+    bool isreply;
+    char *data;
+    int ret;
+
+    data = (char *)msg->hdr + (msg->hdr->size - msgleft);
+    while (msgleft > 0) {
+        ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
+
+        /* error or would block */
+        if (ret <= 0) {
+            /* try for rest on next iternation */
+            if (ret == QIO_CHANNEL_ERR_BLOCK) {
+                proxy->recv_left = msgleft;
+            }
+            return ret;
+        }
+
+        msgleft -= ret;
+        data += ret;
+    }
+
+    /*
+     * Read complete message, process it.
+     */
+    proxy->part_recv = NULL;
+    proxy->recv_left = 0;
+    isreply = (msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REPLY;
+    vfio_user_process(proxy, msg, isreply);
+
+    /* return positive value */
+    return 1;
+}
+
+static void vfio_user_recv(void *opaque)
+{
+    VFIOProxy *proxy = opaque;
+
+    QEMU_LOCK_GUARD(&proxy->lock);
+
+    if (proxy->state == VFIO_PROXY_CONNECTED) {
+        while (vfio_user_recv_one(proxy) == 0) {
+            ;
+        }
+    }
+}
+
+/*
+ * Receive and process one incoming message.
+ *
+ * For replies, find matching outgoing request and wake any waiters.
+ * For requests, queue in incoming list and run request BH.
+ */
+static int vfio_user_recv_one(VFIOProxy *proxy)
+{
+    VFIOUserMsg *msg = NULL;
+    g_autofree int *fdp = NULL;
+    VFIOUserFDs *reqfds;
+    VFIOUserHdr hdr;
+    struct iovec iov = {
+        .iov_base = &hdr,
+        .iov_len = sizeof(hdr),
+    };
+    bool isreply = false;
+    int i, ret;
+    size_t msgleft, numfds = 0;
+    char *data = NULL;
+    char *buf = NULL;
+    Error *local_err = NULL;
+
+    /*
+     * Complete any partial reads
+     */
+    if (proxy->part_recv != NULL) {
+        ret = vfio_user_complete(proxy, &local_err);
+
+        /* still not complete, try later */
+        if (ret == QIO_CHANNEL_ERR_BLOCK) {
+            return ret;
+        }
+
+        if (ret <= 0) {
+            goto fatal;
+        }
+        /* else fall into reading another msg */
+    }
+
+    /*
+     * Read header
+     */
+    ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds,
+                                 &local_err);
+    if (ret == QIO_CHANNEL_ERR_BLOCK) {
+        return ret;
+    }
+
+    /* read error or other side closed connection */
+    if (ret <= 0) {
+        goto fatal;
+    }
+
+    if (ret < sizeof(msg)) {
+        error_setg(&local_err, "short read of header");
+        goto fatal;
+    }
+
+    /*
+     * Validate header
+     */
+    if (hdr.size < sizeof(VFIOUserHdr)) {
+        error_setg(&local_err, "bad header size");
+        goto fatal;
+    }
+    switch (hdr.flags & VFIO_USER_TYPE) {
+    case VFIO_USER_REQUEST:
+        isreply = false;
+        break;
+    case VFIO_USER_REPLY:
+        isreply = true;
+        break;
+    default:
+        error_setg(&local_err, "unknown message type");
+        goto fatal;
+    }
+
+    /*
+     * For replies, find the matching pending request.
+     * For requests, reap incoming FDs.
+     */
+    if (isreply) {
+        QTAILQ_FOREACH(msg, &proxy->pending, next) {
+            if (hdr.id == msg->id) {
+                break;
+            }
+        }
+        if (msg == NULL) {
+            error_setg(&local_err, "unexpected reply");
+            goto err;
+        }
+        QTAILQ_REMOVE(&proxy->pending, msg, next);
+
+        /*
+         * Process any received FDs
+         */
+        if (numfds != 0) {
+            if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
+                error_setg(&local_err, "unexpected FDs");
+                goto err;
+            }
+            msg->fds->recv_fds = numfds;
+            memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
+        }
+    } else {
+        if (numfds != 0) {
+            reqfds = vfio_user_getfds(numfds);
+            memcpy(reqfds->fds, fdp, numfds * sizeof(int));
+        } else {
+            reqfds = NULL;
+        }
+    }
+
+    /*
+     * Put the whole message into a single buffer.
+     */
+    if (isreply) {
+        if (hdr.size > msg->rsize) {
+            error_setg(&local_err, "reply larger than recv buffer");
+            goto err;
+        }
+        *msg->hdr = hdr;
+        data = (char *)msg->hdr + sizeof(hdr);
+    } else {
+        buf = g_malloc0(hdr.size);
+        memcpy(buf, &hdr, sizeof(hdr));
+        data = buf + sizeof(hdr);
+        msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
+        msg->type = VFIO_MSG_REQ;
+    }
+
+    /*
+     * Read rest of message.
+     */
+    msgleft = hdr.size - sizeof(hdr);
+    while (msgleft > 0) {
+        ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
+
+        /* prepare to complete read on next iternation */
+        if (ret == QIO_CHANNEL_ERR_BLOCK) {
+            proxy->part_recv = msg;
+            proxy->recv_left = msgleft;
+            return ret;
+        }
+
+        if (ret <= 0) {
+            goto fatal;
+        }
+
+        msgleft -= ret;
+        data += ret;
+    }
+
+    vfio_user_process(proxy, msg, isreply);
+    return 0;
+
+    /*
+     * fatal means the other side closed or we don't trust the stream
+     * err means this message is corrupt
+     */
+fatal:
+    vfio_user_shutdown(proxy);
+    proxy->state = VFIO_PROXY_ERROR;
+
+    /* set error if server side closed */
+    if (ret == 0) {
+        error_setg(&local_err, "server closed socket");
+    }
+
+err:
+    for (i = 0; i < numfds; i++) {
+        close(fdp[i]);
+    }
+    if (isreply && msg != NULL) {
+        /* force an error to keep sending thread from hanging */
+        vfio_user_set_error(msg->hdr, EINVAL);
+        msg->complete = true;
+        qemu_cond_signal(&msg->cv);
+    }
+    error_prepend(&local_err, "vfio_user_recv: ");
+    error_report_err(local_err);
+    return -1;
+}
+
 static void vfio_user_cb(void *opaque)
 {
     VFIOProxy *proxy = opaque;
@@ -59,6 +398,51 @@  static void vfio_user_cb(void *opaque)
  * Functions called by main or CPU threads
  */
 
+/*
+ * Process incoming requests.
+ *
+ * The bus-specific callback has the form:
+ *    request(opaque, msg)
+ * where 'opaque' was specified in vfio_user_set_handler
+ * and 'msg' is the inbound message.
+ *
+ * The callback is responsible for disposing of the message buffer,
+ * usually by re-using it when calling vfio_send_reply or vfio_send_error,
+ * both of which free their message buffer when the reply is sent.
+ *
+ * If the callback uses a new buffer, it needs to free the old one.
+ */
+static void vfio_user_request(void *opaque)
+{
+    VFIOProxy *proxy = opaque;
+    VFIOUserMsgQ new, free;
+    VFIOUserMsg *msg, *m1;
+
+    /* reap all incoming */
+    QTAILQ_INIT(&new);
+    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
+        QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
+            QTAILQ_REMOVE(&proxy->incoming, msg, next);
+            QTAILQ_INSERT_TAIL(&new, msg, next);
+        }
+    }
+
+    /* process list */
+    QTAILQ_INIT(&free);
+    QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
+        QTAILQ_REMOVE(&new, msg, next);
+        proxy->request(proxy->req_arg, msg);
+        QTAILQ_INSERT_HEAD(&free, msg, next);
+    }
+
+    /* free list */
+    WITH_QEMU_LOCK_GUARD(&proxy->lock) {
+        QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
+            vfio_user_recycle(proxy, msg);
+        }
+    }
+}
+
 static QLIST_HEAD(, VFIOProxy) vfio_user_sockets =
     QLIST_HEAD_INITIALIZER(vfio_user_sockets);
 
@@ -97,6 +481,7 @@  VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
     }
 
     proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
+    proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
 
     QTAILQ_INIT(&proxy->outgoing);
     QTAILQ_INIT(&proxy->incoming);
@@ -107,6 +492,18 @@  VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
     return proxy;
 }
 
+void vfio_user_set_handler(VFIODevice *vbasedev,
+                           void (*handler)(void *opaque, VFIOUserMsg *msg),
+                           void *req_arg)
+{
+    VFIOProxy *proxy = vbasedev->proxy;
+
+    proxy->request = handler;
+    proxy->req_arg = req_arg;
+    qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
+                                   vfio_user_recv, NULL, proxy);
+}
+
 void vfio_user_disconnect(VFIOProxy *proxy)
 {
     VFIOUserMsg *r1, *r2;
@@ -122,6 +519,8 @@  void vfio_user_disconnect(VFIOProxy *proxy)
     }
     object_unref(OBJECT(proxy->ioc));
     proxy->ioc = NULL;
+    qemu_bh_delete(proxy->req_bh);
+    proxy->req_bh = NULL;
 
     proxy->state = VFIO_PROXY_CLOSING;
     QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
diff --git a/hw/vfio/user.h b/hw/vfio/user.h
index da92862..68a1080 100644
--- a/hw/vfio/user.h
+++ b/hw/vfio/user.h
@@ -11,6 +11,8 @@ 
  *
  */
 
+#include "user-protocol.h"
+
 typedef struct {
     int send_fds;
     int recv_fds;
@@ -27,6 +29,7 @@  enum msg_type {
 
 typedef struct VFIOUserMsg {
     QTAILQ_ENTRY(VFIOUserMsg) next;
+    VFIOUserHdr *hdr;
     VFIOUserFDs *fds;
     uint32_t rsize;
     uint32_t id;
@@ -66,6 +69,8 @@  typedef struct VFIOProxy {
     VFIOUserMsgQ incoming;
     VFIOUserMsgQ outgoing;
     VFIOUserMsg *last_nowait;
+    VFIOUserMsg *part_recv;
+    size_t recv_left;
     enum proxy_state state;
 } VFIOProxy;
 
@@ -74,5 +79,8 @@  typedef struct VFIOProxy {
 
 VFIOProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
 void vfio_user_disconnect(VFIOProxy *proxy);
+void vfio_user_set_handler(VFIODevice *vbasedev,
+                           void (*handler)(void *opaque, VFIOUserMsg *msg),
+                           void *reqarg);
 
 #endif /* VFIO_USER_H */