@@ -5,17 +5,40 @@
#include "event_notifier.h"
typedef struct EventHandler EventHandler;
-typedef void EventCallback(EventHandler *handler);
+typedef bool EventCallback(EventHandler *handler);
struct EventHandler
{
- EventNotifier *notifier; /* eventfd */
- EventCallback *callback; /* callback function */
+ EventNotifier *notifier; /* eventfd */
+ EventCallback *callback; /* callback function */
};
typedef struct {
- int epoll_fd; /* epoll(2) file descriptor */
+ int epoll_fd; /* epoll(2) file descriptor */
+ EventNotifier stop_notifier; /* stop poll notifier */
+ EventHandler stop_handler; /* stop poll handler */
} EventPoll;
+/* Add an event notifier and its callback for polling */
+static void event_poll_add(EventPoll *poll, EventHandler *handler, EventNotifier *notifier, EventCallback *callback)
+{
+ struct epoll_event event = {
+ .events = EPOLLIN,
+ .data.ptr = handler,
+ };
+ handler->notifier = notifier;
+ handler->callback = callback;
+ if (epoll_ctl(poll->epoll_fd, EPOLL_CTL_ADD, event_notifier_get_fd(notifier), &event) != 0) {
+ fprintf(stderr, "failed to add event handler to epoll: %m\n");
+ exit(1);
+ }
+}
+
+/* Event callback for stopping the event_poll_run() loop */
+static bool handle_stop(EventHandler *handler)
+{
+ return false; /* stop event loop */
+}
+
static void event_poll_init(EventPoll *poll)
{
/* Create epoll file descriptor */
@@ -24,35 +47,29 @@ static void event_poll_init(EventPoll *poll)
fprintf(stderr, "epoll_create1 failed: %m\n");
exit(1);
}
+
+ /* Set up stop notifier */
+ if (event_notifier_init(&poll->stop_notifier, 0) < 0) {
+ fprintf(stderr, "failed to init stop notifier\n");
+ exit(1);
+ }
+ event_poll_add(poll, &poll->stop_handler,
+ &poll->stop_notifier, handle_stop);
}
static void event_poll_cleanup(EventPoll *poll)
{
+ event_notifier_cleanup(&poll->stop_notifier);
close(poll->epoll_fd);
poll->epoll_fd = -1;
}
-/* Add an event notifier and its callback for polling */
-static void event_poll_add(EventPoll *poll, EventHandler *handler, EventNotifier *notifier, EventCallback *callback)
-{
- struct epoll_event event = {
- .events = EPOLLIN,
- .data.ptr = handler,
- };
- handler->notifier = notifier;
- handler->callback = callback;
- if (epoll_ctl(poll->epoll_fd, EPOLL_CTL_ADD, event_notifier_get_fd(notifier), &event) != 0) {
- fprintf(stderr, "failed to add event handler to epoll: %m\n");
- exit(1);
- }
-}
-
/* Block until the next event and invoke its callback
*
* Signals must be masked, EINTR should never happen. This is true for QEMU
* threads.
*/
-static void event_poll(EventPoll *poll)
+static bool event_poll(EventPoll *poll)
{
EventHandler *handler;
struct epoll_event event;
@@ -73,7 +90,27 @@ static void event_poll(EventPoll *poll)
event_notifier_test_and_clear(handler->notifier);
/* Handle the event */
- handler->callback(handler);
+ return handler->callback(handler);
+}
+
+static void event_poll_run(EventPoll *poll)
+{
+ while (event_poll(poll)) {
+ /* do nothing */
+ }
+}
+
+/* Stop the event_poll_run() loop
+ *
+ * This function can be used from another thread.
+ */
+static void event_poll_stop(EventPoll *poll)
+{
+ uint64_t dummy = 1;
+ int eventfd = event_notifier_get_fd(&poll->stop_notifier);
+ ssize_t unused __attribute__((unused));
+
+ unused = write(eventfd, &dummy, sizeof dummy);
}
#endif /* EVENT_POLL_H */
@@ -3,10 +3,10 @@
typedef struct {
int fd; /* file descriptor */
- unsigned int maxreqs; /* max length of freelist and queue */
+ unsigned int max_reqs; /* max length of freelist and queue */
io_context_t io_ctx; /* Linux AIO context */
- EventNotifier notifier; /* Linux AIO eventfd */
+ EventNotifier io_notifier; /* Linux AIO eventfd */
/* Requests can complete in any order so a free list is necessary to manage
* available iocbs.
@@ -19,25 +19,28 @@ typedef struct {
unsigned int queue_idx;
} IOQueue;
-static void ioq_init(IOQueue *ioq, int fd, unsigned int maxreqs)
+static void ioq_init(IOQueue *ioq, int fd, unsigned int max_reqs)
{
+ int rc;
+
ioq->fd = fd;
- ioq->maxreqs = maxreqs;
+ ioq->max_reqs = max_reqs;
- if (io_setup(maxreqs, &ioq->io_ctx) != 0) {
- fprintf(stderr, "ioq io_setup failed\n");
+ memset(&ioq->io_ctx, 0, sizeof ioq->io_ctx);
+ if ((rc = io_setup(max_reqs, &ioq->io_ctx)) != 0) {
+ fprintf(stderr, "ioq io_setup failed %d\n", rc);
exit(1);
}
- if (event_notifier_init(&ioq->notifier, 0) != 0) {
- fprintf(stderr, "ioq io event notifier creation failed\n");
+ if ((rc = event_notifier_init(&ioq->io_notifier, 0)) != 0) {
+ fprintf(stderr, "ioq io event notifier creation failed %d\n", rc);
exit(1);
}
- ioq->freelist = g_malloc0(sizeof ioq->freelist[0] * maxreqs);
+ ioq->freelist = g_malloc0(sizeof ioq->freelist[0] * max_reqs);
ioq->freelist_idx = 0;
- ioq->queue = g_malloc0(sizeof ioq->queue[0] * maxreqs);
+ ioq->queue = g_malloc0(sizeof ioq->queue[0] * max_reqs);
ioq->queue_idx = 0;
}
@@ -46,13 +49,13 @@ static void ioq_cleanup(IOQueue *ioq)
g_free(ioq->freelist);
g_free(ioq->queue);
- event_notifier_cleanup(&ioq->notifier);
+ event_notifier_cleanup(&ioq->io_notifier);
io_destroy(ioq->io_ctx);
}
static EventNotifier *ioq_get_notifier(IOQueue *ioq)
{
- return &ioq->notifier;
+ return &ioq->io_notifier;
}
static struct iocb *ioq_get_iocb(IOQueue *ioq)
@@ -63,18 +66,19 @@ static struct iocb *ioq_get_iocb(IOQueue *ioq)
}
struct iocb *iocb = ioq->freelist[--ioq->freelist_idx];
ioq->queue[ioq->queue_idx++] = iocb;
+ return iocb;
}
-static __attribute__((unused)) void ioq_put_iocb(IOQueue *ioq, struct iocb *iocb)
+static void ioq_put_iocb(IOQueue *ioq, struct iocb *iocb)
{
- if (unlikely(ioq->freelist_idx == ioq->maxreqs)) {
+ if (unlikely(ioq->freelist_idx == ioq->max_reqs)) {
fprintf(stderr, "ioq overflow\n");
exit(1);
}
ioq->freelist[ioq->freelist_idx++] = iocb;
}
-static __attribute__((unused)) void ioq_rdwr(IOQueue *ioq, bool read, struct iovec *iov, unsigned int count, long long offset)
+static struct iocb *ioq_rdwr(IOQueue *ioq, bool read, struct iovec *iov, unsigned int count, long long offset)
{
struct iocb *iocb = ioq_get_iocb(ioq);
@@ -83,22 +87,45 @@ static __attribute__((unused)) void ioq_rdwr(IOQueue *ioq, bool read, struct iov
} else {
io_prep_pwritev(iocb, ioq->fd, iov, count, offset);
}
- io_set_eventfd(iocb, event_notifier_get_fd(&ioq->notifier));
+ io_set_eventfd(iocb, event_notifier_get_fd(&ioq->io_notifier));
+ return iocb;
}
-static __attribute__((unused)) void ioq_fdsync(IOQueue *ioq)
+static struct iocb *ioq_fdsync(IOQueue *ioq)
{
struct iocb *iocb = ioq_get_iocb(ioq);
io_prep_fdsync(iocb, ioq->fd);
- io_set_eventfd(iocb, event_notifier_get_fd(&ioq->notifier));
+ io_set_eventfd(iocb, event_notifier_get_fd(&ioq->io_notifier));
+ return iocb;
}
-static __attribute__((unused)) int ioq_submit(IOQueue *ioq)
+static int ioq_submit(IOQueue *ioq)
{
int rc = io_submit(ioq->io_ctx, ioq->queue_idx, ioq->queue);
ioq->queue_idx = 0; /* reset */
return rc;
}
+typedef void IOQueueCompletion(struct iocb *iocb, ssize_t ret, void *opaque);
+static int ioq_run_completion(IOQueue *ioq, IOQueueCompletion *completion, void *opaque)
+{
+ struct io_event events[ioq->max_reqs];
+ int nevents, i;
+
+ nevents = io_getevents(ioq->io_ctx, 0, ioq->max_reqs, events, NULL);
+ if (unlikely(nevents < 0)) {
+ fprintf(stderr, "io_getevents failed %d\n", nevents);
+ exit(1);
+ }
+
+ for (i = 0; i < nevents; i++) {
+ ssize_t ret = ((uint64_t)events[i].res2 << 32) | events[i].res;
+
+ completion(events[i].obj, ret, opaque);
+ ioq_put_iocb(ioq, events[i].obj);
+ }
+ return nevents;
+}
+
#endif /* IO_QUEUE_H */
@@ -56,8 +56,8 @@ static void vring_setup(Vring *vring, VirtIODevice *vdev, int n)
vring_init(&vring->vr, virtio_queue_get_num(vdev, n),
phys_to_host(vring, virtio_queue_get_ring_addr(vdev, n)), 4096);
- vring->last_avail_idx = vring->vr.avail->idx;
- vring->last_used_idx = vring->vr.used->idx;
+ vring->last_avail_idx = 0;
+ vring->last_used_idx = 0;
fprintf(stderr, "vring physical=%#lx desc=%p avail=%p used=%p\n",
virtio_queue_get_ring_addr(vdev, n),
@@ -176,7 +176,7 @@ static unsigned int vring_pop(Vring *vring,
*
* Stolen from linux-2.6/drivers/vhost/vhost.c.
*/
-static __attribute__((unused)) void vring_push(Vring *vring, unsigned int head, int len)
+static void vring_push(Vring *vring, unsigned int head, int len)
{
struct vring_used_elem *used;
@@ -29,8 +29,13 @@ enum {
REQ_MAX = VRING_MAX / 2, /* maximum number of requests in the vring */
};
-typedef struct VirtIOBlock
-{
+typedef struct {
+ struct iocb iocb; /* Linux AIO control block */
+ unsigned char *status; /* virtio block status code */
+ unsigned int head; /* vring descriptor index */
+} VirtIOBlockRequest;
+
+typedef struct {
VirtIODevice vdev;
BlockDriverState *bs;
VirtQueue *vq;
@@ -44,11 +49,12 @@ typedef struct VirtIOBlock
Vring vring; /* virtqueue vring */
- IOQueue ioqueue; /* Linux AIO queue (should really be per dataplane thread) */
-
EventPoll event_poll; /* event poller */
EventHandler io_handler; /* Linux AIO completion handler */
EventHandler notify_handler; /* virtqueue notify handler */
+
+ IOQueue ioqueue; /* Linux AIO queue (should really be per dataplane thread) */
+ VirtIOBlockRequest requests[REQ_MAX]; /* pool of requests, managed by the queue */
} VirtIOBlock;
static VirtIOBlock *to_virtio_blk(VirtIODevice *vdev)
@@ -56,12 +62,40 @@ static VirtIOBlock *to_virtio_blk(VirtIODevice *vdev)
return (VirtIOBlock *)vdev;
}
-static void handle_io(EventHandler *handler)
+static void complete_request(struct iocb *iocb, ssize_t ret, void *opaque)
{
- fprintf(stderr, "io completion happened\n");
+ VirtIOBlock *s = opaque;
+ VirtIOBlockRequest *req = container_of(iocb, VirtIOBlockRequest, iocb);
+ int len;
+
+ if (likely(ret >= 0)) {
+ *req->status = VIRTIO_BLK_S_OK;
+ len = ret;
+ } else {
+ *req->status = VIRTIO_BLK_S_IOERR;
+ len = 0;
+ }
+
+ /* According to the virtio specification len should be the number of bytes
+ * written to, but for virtio-blk it seems to be the number of bytes
+ * transferred plus the status bytes.
+ */
+ vring_push(&s->vring, req->head, len + sizeof req->status);
}
-static void process_request(struct iovec iov[], unsigned int out_num, unsigned int in_num)
+static bool handle_io(EventHandler *handler)
+{
+ VirtIOBlock *s = container_of(handler, VirtIOBlock, io_handler);
+
+ if (ioq_run_completion(&s->ioqueue, complete_request, s) > 0) {
+ /* TODO is this thread-safe and can it be done faster? */
+ virtio_irq(s->vq);
+ }
+
+ return true;
+}
+
+static void process_request(IOQueue *ioq, struct iovec iov[], unsigned int out_num, unsigned int in_num, unsigned int head)
{
/* Virtio block requests look like this: */
struct virtio_blk_outhdr *outhdr; /* iov[0] */
@@ -78,11 +112,54 @@ static void process_request(struct iovec iov[], unsigned int out_num, unsigned i
outhdr = iov[0].iov_base;
inhdr = iov[out_num + in_num - 1].iov_base;
+ /*
fprintf(stderr, "virtio-blk request type=%#x sector=%#lx\n",
outhdr->type, outhdr->sector);
+ */
+
+ if (unlikely(outhdr->type & ~(VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH))) {
+ fprintf(stderr, "virtio-blk unsupported request type %#x\n", outhdr->type);
+ exit(1);
+ }
+
+ struct iocb *iocb;
+ switch (outhdr->type & (VIRTIO_BLK_T_OUT | VIRTIO_BLK_T_FLUSH)) {
+ case VIRTIO_BLK_T_IN:
+ if (unlikely(out_num != 1)) {
+ fprintf(stderr, "virtio-blk invalid read request\n");
+ exit(1);
+ }
+ iocb = ioq_rdwr(ioq, true, &iov[1], in_num - 1, outhdr->sector * 512UL); /* TODO is it always 512? */
+ break;
+
+ case VIRTIO_BLK_T_OUT:
+ if (unlikely(in_num != 1)) {
+ fprintf(stderr, "virtio-blk invalid write request\n");
+ exit(1);
+ }
+ iocb = ioq_rdwr(ioq, false, &iov[1], out_num - 1, outhdr->sector * 512UL); /* TODO is it always 512? */
+ break;
+
+ case VIRTIO_BLK_T_FLUSH:
+ if (unlikely(in_num != 1 || out_num != 1)) {
+ fprintf(stderr, "virtio-blk invalid flush request\n");
+ exit(1);
+ }
+ iocb = ioq_fdsync(ioq);
+ break;
+
+ default:
+ fprintf(stderr, "virtio-blk multiple request type bits set\n");
+ exit(1);
+ }
+
+ /* Fill in virtio block metadata needed for completion */
+ VirtIOBlockRequest *req = container_of(iocb, VirtIOBlockRequest, iocb);
+ req->head = head;
+ req->status = &inhdr->status;
}
-static void handle_notify(EventHandler *handler)
+static bool handle_notify(EventHandler *handler)
{
VirtIOBlock *s = container_of(handler, VirtIOBlock, notify_handler);
@@ -114,19 +191,29 @@ static void handle_notify(EventHandler *handler)
break; /* no more requests */
}
- fprintf(stderr, "head=%u out_num=%u in_num=%u\n", head, out_num, in_num);
+ /*
+ fprintf(stderr, "out_num=%u in_num=%u head=%u\n", out_num, in_num, head);
+ */
- process_request(iov, out_num, in_num);
+ process_request(&s->ioqueue, iov, out_num, in_num, head);
}
+
+ /* Submit requests, if any */
+ if (likely(iov != iovec)) {
+ if (unlikely(ioq_submit(&s->ioqueue) < 0)) {
+ fprintf(stderr, "ioq_submit failed\n");
+ exit(1);
+ }
+ }
+
+ return true;
}
static void *data_plane_thread(void *opaque)
{
VirtIOBlock *s = opaque;
- for (;;) {
- event_poll(&s->event_poll);
- }
+ event_poll_run(&s->event_poll);
return NULL;
}
@@ -140,10 +227,13 @@ static int get_raw_posix_fd_hack(VirtIOBlock *s)
static void data_plane_start(VirtIOBlock *s)
{
+ int i;
+
vring_setup(&s->vring, &s->vdev, 0);
event_poll_init(&s->event_poll);
+ /* Set up virtqueue notify */
if (s->vdev.binding->set_host_notifier(s->vdev.binding_opaque, 0, true) != 0) {
fprintf(stderr, "virtio-blk failed to set host notifier, ensure -enable-kvm is set\n");
exit(1);
@@ -152,8 +242,11 @@ static void data_plane_start(VirtIOBlock *s)
virtio_queue_get_host_notifier(s->vq),
handle_notify);
+ /* Set up ioqueue */
ioq_init(&s->ioqueue, get_raw_posix_fd_hack(s), REQ_MAX);
- /* TODO populate ioqueue freelist */
+ for (i = 0; i < ARRAY_SIZE(s->requests); i++) {
+ ioq_put_iocb(&s->ioqueue, &s->requests[i].iocb);
+ }
event_poll_add(&s->event_poll, &s->io_handler, ioq_get_notifier(&s->ioqueue), handle_io);
qemu_thread_create(&s->data_plane_thread, data_plane_thread, s, QEMU_THREAD_JOINABLE);
@@ -165,7 +258,9 @@ static void data_plane_stop(VirtIOBlock *s)
{
s->data_plane_started = false;
- /* TODO stop data plane thread */
+ /* Tell data plane thread to stop and then wait for it to return */
+ event_poll_stop(&s->event_poll);
+ pthread_join(s->data_plane_thread.thread, NULL);
ioq_cleanup(&s->ioqueue);
@@ -183,6 +278,10 @@ static void virtio_blk_set_status(VirtIODevice *vdev, uint8_t val)
return;
}
+ /*
+ fprintf(stderr, "virtio_blk_set_status %#x\n", val);
+ */
+
if (val & VIRTIO_CONFIG_S_DRIVER_OK) {
data_plane_start(s);
} else {
@@ -190,11 +289,29 @@ static void virtio_blk_set_status(VirtIODevice *vdev, uint8_t val)
}
}
+static void virtio_blk_reset(VirtIODevice *vdev)
+{
+ virtio_blk_set_status(vdev, 0);
+}
+
static void virtio_blk_handle_output(VirtIODevice *vdev, VirtQueue *vq)
{
- fprintf(stderr, "virtio_blk_handle_output: should never get here, "
- "data plane thread should process requests\n");
- exit(1);
+ VirtIOBlock *s = to_virtio_blk(vdev);
+
+ if (s->data_plane_started) {
+ fprintf(stderr, "virtio_blk_handle_output: should never get here, "
+ "data plane thread should process requests\n");
+ exit(1);
+ }
+
+ /* Linux seems to notify before the driver comes up. This needs more
+ * investigation. Just use a hack for now.
+ */
+ virtio_blk_set_status(vdev, VIRTIO_CONFIG_S_DRIVER_OK); /* start the thread */
+
+ /* Now kick the thread */
+ uint64_t dummy = 1;
+ ssize_t unused __attribute__((unused)) = write(event_notifier_get_fd(virtio_queue_get_host_notifier(s->vq)), &dummy, sizeof dummy);
}
/* coalesce internal state, copy to pci i/o region 0
@@ -273,6 +390,7 @@ VirtIODevice *virtio_blk_init(DeviceState *dev, BlockConf *conf,
s->vdev.get_config = virtio_blk_update_config;
s->vdev.get_features = virtio_blk_get_features;
s->vdev.set_status = virtio_blk_set_status;
+ s->vdev.reset = virtio_blk_reset;
s->bs = conf->bs;
s->conf = conf;
s->serial = *serial;
Signed-off-by: Stefan Hajnoczi <stefanha@linux.vnet.ibm.com> --- hw/dataplane/event-poll.h | 79 ++++++++++++++++------- hw/dataplane/ioq.h | 65 +++++++++++++------ hw/dataplane/vring.h | 6 +- hw/virtio-blk.c | 154 +++++++++++++++++++++++++++++++++++++++------ 4 files changed, 243 insertions(+), 61 deletions(-)