@@ -12,6 +12,7 @@ block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
block-obj-$(CONFIG_POSIX) += compatfd.o
+block-obj-$(CONFIG_RBD) += qemu-thread.o
block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o
block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o
@@ -24,7 +24,7 @@
#include <rados/librados.h>
#include <signal.h>
-
+#include <qemu-thread.h>
int eventfd(unsigned int initval, int flags);
@@ -50,6 +50,7 @@ int eventfd(unsigned int initval, int flags);
*/
#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
+#define MAX_QUEUE_SIZE 33554432 // 32MB
typedef struct RBDAIOCB {
BlockDriverAIOCB common;
@@ -82,6 +83,9 @@ typedef struct BDRVRBDState {
uint64_t objsize;
int qemu_aio_count;
int read_only;
+ uint64_t queuesize;
+ QemuMutex *queue_mutex;
+ QemuCond *queue_threshold;
} BDRVRBDState;
typedef struct rbd_obj_header_ondisk RbdHeader1;
@@ -487,6 +491,13 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
s->read_only = (snap != NULL);
+ s->queuesize = 0;
+
+ s->queue_mutex = qemu_malloc(sizeof(QemuMutex));
+ qemu_mutex_init(s->queue_mutex);
+ s->queue_threshold = qemu_malloc(sizeof(QemuCond));
+ qemu_cond_init(s->queue_threshold);
+
s->efd = eventfd(0, 0);
if (s->efd < 0) {
error_report("error opening eventfd");
@@ -523,6 +534,12 @@ static void rbd_close(BlockDriverState *bs)
{
BDRVRBDState *s = bs->opaque;
+ // The following do not exist in qemu:
+ // qemu_cond_destroy(s->queue_threshold);
+ // qemu_mutex_destroy(s->queue_mutex);
+ qemu_free(s->queue_threshold);
+ qemu_free(s->queue_mutex);
+
rados_close_pool(s->header_pool);
rados_close_pool(s->pool);
rados_deinitialize();
@@ -613,6 +630,12 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
int i;
acb->aiocnt--;
+ acb->s->queuesize -= rcb->segsize;
+ if (acb->s->queuesize+rcb->segsize > MAX_QUEUE_SIZE && acb->s->queuesize <= MAX_QUEUE_SIZE) {
+ qemu_mutex_lock(acb->s->queue_mutex);
+ qemu_cond_signal(acb->s->queue_threshold);
+ qemu_mutex_unlock(acb->s->queue_mutex);
+ }
r = rados_aio_get_return_value(c);
rados_aio_release(c);
if (acb->write) {
@@ -735,6 +758,14 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
rcb->segsize = segsize;
rcb->buf = buf;
+ while (s->queuesize > MAX_QUEUE_SIZE) {
+ qemu_mutex_lock(s->queue_mutex);
+ qemu_cond_wait(s->queue_threshold, s->queue_mutex);
+ qemu_mutex_unlock(s->queue_mutex);
+ }
+
+ s->queuesize += segsize;
+
if (write) {
rados_aio_create_completion(rcb, NULL,
(rados_callback_t) rbd_finish_aiocb,