diff mbox

qemu-kvm/rbd: add queueing delay based on queuesize (using use qemu_mutex_* and qemu_cond_*)

Message ID 20100714133545.GA21617@bambus (mailing list archive)
State New, archived
Headers show

Commit Message

Christian Brunner July 14, 2010, 1:35 p.m. UTC
None
diff mbox

Patch

diff --git a/Makefile.objs b/Makefile.objs
index 56a13c1..e1b8513 100644
--- a/Makefile.objs
+++ b/Makefile.objs
@@ -12,6 +12,7 @@  block-obj-y += nbd.o block.o aio.o aes.o osdep.o qemu-config.o
 block-obj-$(CONFIG_POSIX) += posix-aio-compat.o
 block-obj-$(CONFIG_LINUX_AIO) += linux-aio.o
 block-obj-$(CONFIG_POSIX) += compatfd.o
+block-obj-$(CONFIG_RBD) += qemu-thread.o
 
 block-nested-y += raw.o cow.o qcow.o vdi.o vmdk.o cloop.o dmg.o bochs.o vpc.o vvfat.o
 block-nested-y += qcow2.o qcow2-refcount.o qcow2-cluster.o qcow2-snapshot.o
diff --git a/block/rbd.c b/block/rbd.c
index e7d4083..01786da 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -24,7 +24,7 @@ 
 #include <rados/librados.h>
 
 #include <signal.h>
-
+#include <qemu-thread.h>
 
 int eventfd(unsigned int initval, int flags);
 
@@ -50,6 +50,7 @@  int eventfd(unsigned int initval, int flags);
  */
 
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
+#define MAX_QUEUE_SIZE 33554432 // 32MB
 
 typedef struct RBDAIOCB {
     BlockDriverAIOCB common;
@@ -82,6 +83,9 @@  typedef struct BDRVRBDState {
     uint64_t objsize;
     int qemu_aio_count;
     int read_only;
+    uint64_t queuesize;
+    QemuMutex *queue_mutex;
+    QemuCond *queue_threshold;
 } BDRVRBDState;
 
 typedef struct rbd_obj_header_ondisk RbdHeader1;
@@ -487,6 +491,13 @@  static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
 
     s->read_only = (snap != NULL);
 
+    s->queuesize = 0;
+
+    s->queue_mutex = qemu_malloc(sizeof(QemuMutex));
+    qemu_mutex_init(s->queue_mutex);
+    s->queue_threshold = qemu_malloc(sizeof(QemuCond));
+    qemu_cond_init(s->queue_threshold);
+
     s->efd = eventfd(0, 0);
     if (s->efd < 0) {
         error_report("error opening eventfd");
@@ -523,6 +534,12 @@  static void rbd_close(BlockDriverState *bs)
 {
     BDRVRBDState *s = bs->opaque;
 
+    // The following do not exist in qemu:
+    // qemu_cond_destroy(s->queue_threshold);
+    // qemu_mutex_destroy(s->queue_mutex);
+    qemu_free(s->queue_threshold);
+    qemu_free(s->queue_mutex);
+
     rados_close_pool(s->header_pool);
     rados_close_pool(s->pool);
     rados_deinitialize();
@@ -613,6 +630,12 @@  static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
     int i;
 
     acb->aiocnt--;
+    acb->s->queuesize -= rcb->segsize;
+    if (acb->s->queuesize+rcb->segsize > MAX_QUEUE_SIZE && acb->s->queuesize <= MAX_QUEUE_SIZE) {
+        qemu_mutex_lock(acb->s->queue_mutex);
+        qemu_cond_signal(acb->s->queue_threshold);
+        qemu_mutex_unlock(acb->s->queue_mutex);
+    }
     r = rados_aio_get_return_value(c);
     rados_aio_release(c);
     if (acb->write) {
@@ -735,6 +758,14 @@  static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
         rcb->segsize = segsize;
         rcb->buf = buf;
 
+        while  (s->queuesize > MAX_QUEUE_SIZE) {
+            qemu_mutex_lock(s->queue_mutex);
+            qemu_cond_wait(s->queue_threshold, s->queue_mutex);
+            qemu_mutex_unlock(s->queue_mutex);
+        }
+
+        s->queuesize += segsize;
+
         if (write) {
             rados_aio_create_completion(rcb, NULL,
                                         (rados_callback_t) rbd_finish_aiocb,