@@ -2,10 +2,12 @@
* RDMA protocol and interfaces
*
* Copyright IBM, Corp. 2010-2013
+ * Copyright Red Hat, Inc. 2015-2016
*
* Authors:
* Michael R. Hines <mrhines@us.ibm.com>
* Jiuxing Liu <jl@us.ibm.com>
+ * Daniel P. Berrange <berrange@redhat.com>
*
* This work is licensed under the terms of the GNU GPL, version 2 or
* later. See the COPYING file in the top-level directory.
@@ -372,14 +374,20 @@ typedef struct RDMAContext {
GHashTable *blockmap;
} RDMAContext;
-/*
- * Interface to the rest of the migration call stack.
- */
-typedef struct QEMUFileRDMA {
+#define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
+#define QIO_CHANNEL_RDMA(obj) \
+ OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
+
+typedef struct QIOChannelRDMA QIOChannelRDMA;
+
+
+struct QIOChannelRDMA {
+ QIOChannel parent;
RDMAContext *rdma;
+ QEMUFile *file;
size_t len;
- void *file;
-} QEMUFileRDMA;
+ bool blocking; /* XXX we don't actually honour this yet */
+};
/*
* Main structure for IB Send/Recv control messages.
@@ -2516,15 +2524,19 @@ static void *qemu_rdma_data_init(const char *host_port, Error **errp)
* SEND messages for control only.
* VM's ram is handled with regular RDMA messages.
*/
-static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
- int64_t pos, size_t size)
-{
- QEMUFileRDMA *r = opaque;
- QEMUFile *f = r->file;
- RDMAContext *rdma = r->rdma;
- size_t remaining = size;
- uint8_t * data = (void *) buf;
+static ssize_t qio_channel_rdma_writev(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int *fds,
+ size_t nfds,
+ Error **errp)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ QEMUFile *f = rioc->file;
+ RDMAContext *rdma = rioc->rdma;
int ret;
+ ssize_t done = 0;
+ size_t i;
CHECK_ERROR_STATE();
@@ -2538,27 +2550,31 @@ static ssize_t qemu_rdma_put_buffer(void *opaque, const uint8_t *buf,
return ret;
}
- while (remaining) {
- RDMAControlHeader head;
+ for (i = 0; i < niov; i++) {
+ size_t remaining = iov[i].iov_len;
+ uint8_t * data = (void *)iov[i].iov_base;
+ while (remaining) {
+ RDMAControlHeader head;
- r->len = MIN(remaining, RDMA_SEND_INCREMENT);
- remaining -= r->len;
+ rioc->len = MIN(remaining, RDMA_SEND_INCREMENT);
+ remaining -= rioc->len;
- /* Guaranteed to fit due to RDMA_SEND_INCREMENT MIN above */
- head.len = (uint32_t)r->len;
- head.type = RDMA_CONTROL_QEMU_FILE;
+ head.len = rioc->len;
+ head.type = RDMA_CONTROL_QEMU_FILE;
- ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
+ ret = qemu_rdma_exchange_send(rdma, &head, data, NULL, NULL, NULL);
- if (ret < 0) {
- rdma->error_state = ret;
- return ret;
- }
+ if (ret < 0) {
+ rdma->error_state = ret;
+ return ret;
+ }
- data += r->len;
+ data += rioc->len;
+ done += rioc->len;
+ }
}
- return size;
+ return done;
}
static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
@@ -2583,41 +2599,74 @@ static size_t qemu_rdma_fill(RDMAContext *rdma, uint8_t *buf,
* RDMA links don't use bytestreams, so we have to
* return bytes to QEMUFile opportunistically.
*/
-static ssize_t qemu_rdma_get_buffer(void *opaque, uint8_t *buf,
- int64_t pos, size_t size)
-{
- QEMUFileRDMA *r = opaque;
- RDMAContext *rdma = r->rdma;
+static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,
+ const struct iovec *iov,
+ size_t niov,
+ int **fds,
+ size_t *nfds,
+ Error **errp)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ RDMAContext *rdma = rioc->rdma;
RDMAControlHeader head;
int ret = 0;
+ ssize_t i;
+ size_t done = 0;
CHECK_ERROR_STATE();
- /*
- * First, we hold on to the last SEND message we
- * were given and dish out the bytes until we run
- * out of bytes.
- */
- r->len = qemu_rdma_fill(r->rdma, buf, size, 0);
- if (r->len) {
- return r->len;
- }
+ for (i = 0; i < niov; i++) {
+ size_t want = iov[i].iov_len;
+ uint8_t *data = (void *)iov[i].iov_base;
- /*
- * Once we run out, we block and wait for another
- * SEND message to arrive.
- */
- ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+ /*
+ * First, we hold on to the last SEND message we
+ * were given and dish out the bytes until we run
+ * out of bytes.
+ */
+ ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+ done += ret;
+ want -= ret;
+ /* Got what we needed, so go to next iovec */
+ if (want == 0) {
+ continue;
+ }
- if (ret < 0) {
- rdma->error_state = ret;
- return ret;
- }
+ /* If we got any data so far, then don't wait
+ * for more, just return what we have */
+ if (done > 0) {
+ break;
+ }
- /*
- * SEND was received with new bytes, now try again.
- */
- return qemu_rdma_fill(r->rdma, buf, size, 0);
+
+ /* We've got nothing at all, so lets wait for
+ * more to arrive
+ */
+ ret = qemu_rdma_exchange_recv(rdma, &head, RDMA_CONTROL_QEMU_FILE);
+
+ if (ret < 0) {
+ rdma->error_state = ret;
+ return ret;
+ }
+
+ /*
+ * SEND was received with new bytes, now try again.
+ */
+ ret = qemu_rdma_fill(rioc->rdma, data, want, 0);
+ done += ret;
+ want -= ret;
+
+ /* Still didn't get enough, so lets just return */
+ if (want) {
+ if (done == 0) {
+ return QIO_CHANNEL_ERR_BLOCK;
+ } else {
+ break;
+ }
+ }
+ }
+ rioc->len = done;
+ return rioc->len;
}
/*
@@ -2644,15 +2693,122 @@ static int qemu_rdma_drain_cq(QEMUFile *f, RDMAContext *rdma)
return 0;
}
-static int qemu_rdma_close(void *opaque)
+
+static int qio_channel_rdma_set_blocking(QIOChannel *ioc,
+ bool blocking,
+ Error **errp)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ /* XXX we should make readv/writev actually honour this :-) */
+ rioc->blocking = blocking;
+ return 0;
+}
+
+
+typedef struct QIOChannelRDMASource QIOChannelRDMASource;
+struct QIOChannelRDMASource {
+ GSource parent;
+ QIOChannelRDMA *rioc;
+ GIOCondition condition;
+};
+
+static gboolean
+qio_channel_rdma_source_prepare(GSource *source,
+ gint *timeout)
+{
+ QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+ RDMAContext *rdma = rsource->rioc->rdma;
+ GIOCondition cond = 0;
+ *timeout = -1;
+
+ if (rdma->wr_data[0].control_len) {
+ cond |= G_IO_IN;
+ }
+ cond |= G_IO_OUT;
+
+ return cond & rsource->condition;
+}
+
+static gboolean
+qio_channel_rdma_source_check(GSource *source)
+{
+ QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+ RDMAContext *rdma = rsource->rioc->rdma;
+ GIOCondition cond = 0;
+
+ if (rdma->wr_data[0].control_len) {
+ cond |= G_IO_IN;
+ }
+ cond |= G_IO_OUT;
+
+ return cond & rsource->condition;
+}
+
+static gboolean
+qio_channel_rdma_source_dispatch(GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ QIOChannelFunc func = (QIOChannelFunc)callback;
+ QIOChannelRDMASource *rsource = (QIOChannelRDMASource *)source;
+ RDMAContext *rdma = rsource->rioc->rdma;
+ GIOCondition cond = 0;
+
+ if (rdma->wr_data[0].control_len) {
+ cond |= G_IO_IN;
+ }
+ cond |= G_IO_OUT;
+
+ return (*func)(QIO_CHANNEL(rsource->rioc),
+ (cond & rsource->condition),
+ user_data);
+}
+
+static void
+qio_channel_rdma_source_finalize(GSource *source)
+{
+ QIOChannelRDMASource *ssource = (QIOChannelRDMASource *)source;
+
+ object_unref(OBJECT(ssource->rioc));
+}
+
+GSourceFuncs qio_channel_rdma_source_funcs = {
+ qio_channel_rdma_source_prepare,
+ qio_channel_rdma_source_check,
+ qio_channel_rdma_source_dispatch,
+ qio_channel_rdma_source_finalize
+};
+
+static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc,
+ GIOCondition condition)
{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
+ QIOChannelRDMASource *ssource;
+ GSource *source;
+
+ source = g_source_new(&qio_channel_rdma_source_funcs,
+ sizeof(QIOChannelRDMASource));
+ ssource = (QIOChannelRDMASource *)source;
+
+ ssource->rioc = rioc;
+ object_ref(OBJECT(rioc));
+
+ ssource->condition = condition;
+
+ return source;
+}
+
+
+static int qio_channel_rdma_close(QIOChannel *ioc,
+ Error **errp)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(ioc);
trace_qemu_rdma_close();
- QEMUFileRDMA *r = opaque;
- if (r->rdma) {
- qemu_rdma_cleanup(r->rdma);
- g_free(r->rdma);
+ if (rioc->rdma) {
+ qemu_rdma_cleanup(rioc->rdma);
+ g_free(rioc->rdma);
+ rioc->rdma = NULL;
}
- g_free(r);
return 0;
}
@@ -2694,8 +2850,8 @@ static size_t qemu_rdma_save_page(QEMUFile *f, void *opaque,
ram_addr_t block_offset, ram_addr_t offset,
size_t size, uint64_t *bytes_sent)
{
- QEMUFileRDMA *rfile = opaque;
- RDMAContext *rdma = rfile->rdma;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ RDMAContext *rdma = rioc->rdma;
int ret;
CHECK_ERROR_STATE();
@@ -2949,8 +3105,8 @@ static int qemu_rdma_registration_handle(QEMUFile *f, void *opaque)
};
RDMAControlHeader blocks = { .type = RDMA_CONTROL_RAM_BLOCKS_RESULT,
.repeat = 1 };
- QEMUFileRDMA *rfile = opaque;
- RDMAContext *rdma = rfile->rdma;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ RDMAContext *rdma = rioc->rdma;
RDMALocalBlocks *local = &rdma->local_ram_blocks;
RDMAControlHeader head;
RDMARegister *reg, *registers;
@@ -3205,9 +3361,10 @@ out:
* We've already built our local RAMBlock list, but not yet sent the list to
* the source.
*/
-static int rdma_block_notification_handle(QEMUFileRDMA *rfile, const char *name)
+static int
+rdma_block_notification_handle(QIOChannelRDMA *rioc, const char *name)
{
- RDMAContext *rdma = rfile->rdma;
+ RDMAContext *rdma = rioc->rdma;
int curr;
int found = -1;
@@ -3249,8 +3406,8 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
static int qemu_rdma_registration_start(QEMUFile *f, void *opaque,
uint64_t flags, void *data)
{
- QEMUFileRDMA *rfile = opaque;
- RDMAContext *rdma = rfile->rdma;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ RDMAContext *rdma = rioc->rdma;
CHECK_ERROR_STATE();
@@ -3269,8 +3426,8 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
uint64_t flags, void *data)
{
Error *local_err = NULL, **errp = &local_err;
- QEMUFileRDMA *rfile = opaque;
- RDMAContext *rdma = rfile->rdma;
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(opaque);
+ RDMAContext *rdma = rioc->rdma;
RDMAControlHeader head = { .len = 0, .repeat = 1 };
int ret = 0;
@@ -3366,55 +3523,74 @@ err:
return ret;
}
-static int qemu_rdma_get_fd(void *opaque)
-{
- QEMUFileRDMA *rfile = opaque;
- RDMAContext *rdma = rfile->rdma;
-
- return rdma->comp_channel->fd;
-}
-
-static const QEMUFileOps rdma_read_ops = {
- .get_buffer = qemu_rdma_get_buffer,
- .get_fd = qemu_rdma_get_fd,
- .close = qemu_rdma_close,
-};
-
static const QEMUFileHooks rdma_read_hooks = {
.hook_ram_load = rdma_load_hook,
};
-static const QEMUFileOps rdma_write_ops = {
- .put_buffer = qemu_rdma_put_buffer,
- .close = qemu_rdma_close,
-};
-
static const QEMUFileHooks rdma_write_hooks = {
.before_ram_iterate = qemu_rdma_registration_start,
.after_ram_iterate = qemu_rdma_registration_stop,
.save_page = qemu_rdma_save_page,
};
-static void *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
+
+static void qio_channel_rdma_finalize(Object *obj)
+{
+ QIOChannelRDMA *rioc = QIO_CHANNEL_RDMA(obj);
+ if (rioc->rdma) {
+ qemu_rdma_cleanup(rioc->rdma);
+ g_free(rioc->rdma);
+ rioc->rdma = NULL;
+ }
+}
+
+static void qio_channel_rdma_class_init(ObjectClass *klass,
+ void *class_data G_GNUC_UNUSED)
+{
+ QIOChannelClass *ioc_klass = QIO_CHANNEL_CLASS(klass);
+
+ ioc_klass->io_writev = qio_channel_rdma_writev;
+ ioc_klass->io_readv = qio_channel_rdma_readv;
+ ioc_klass->io_set_blocking = qio_channel_rdma_set_blocking;
+ ioc_klass->io_close = qio_channel_rdma_close;
+ ioc_klass->io_create_watch = qio_channel_rdma_create_watch;
+}
+
+static const TypeInfo qio_channel_rdma_info = {
+ .parent = TYPE_QIO_CHANNEL,
+ .name = TYPE_QIO_CHANNEL_RDMA,
+ .instance_size = sizeof(QIOChannelRDMA),
+ .instance_finalize = qio_channel_rdma_finalize,
+ .class_init = qio_channel_rdma_class_init,
+};
+
+static void qio_channel_rdma_register_types(void)
+{
+ type_register_static(&qio_channel_rdma_info);
+}
+
+type_init(qio_channel_rdma_register_types);
+
+static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, const char *mode)
{
- QEMUFileRDMA *r;
+ QIOChannelRDMA *rioc;
if (qemu_file_mode_is_not_valid(mode)) {
return NULL;
}
- r = g_new0(QEMUFileRDMA, 1);
- r->rdma = rdma;
+ rioc = QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA));
+ rioc->rdma = rdma;
if (mode[0] == 'w') {
- r->file = qemu_fopen_ops(r, &rdma_write_ops);
- qemu_file_set_hooks(r->file, &rdma_write_hooks);
+ rioc->file = qemu_fopen_channel_output(QIO_CHANNEL(rioc));
+ qemu_file_set_hooks(rioc->file, &rdma_write_hooks);
} else {
- r->file = qemu_fopen_ops(r, &rdma_read_ops);
- qemu_file_set_hooks(r->file, &rdma_read_hooks);
+ rioc->file = qemu_fopen_channel_input(QIO_CHANNEL(rioc));
+ qemu_file_set_hooks(rioc->file, &rdma_read_hooks);
}
- return r->file;
+ return rioc->file;
}
static void rdma_accept_incoming_migration(void *opaque)