diff mbox

[Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3)

Message ID 20100713192338.GA25126@sir.home (mailing list archive)
State New, archived
Headers show

Commit Message

Christian Brunner July 13, 2010, 7:23 p.m. UTC
None
diff mbox

Patch

diff --git a/block/rbd.c b/block/rbd.c
index 10daf20..c6693d7 100644
--- a/block/rbd.c
+++ b/block/rbd.c
@@ -24,7 +24,7 @@ 
 #include <rados/librados.h>
 
 #include <signal.h>
-
+#include <pthread.h>
 
 int eventfd(unsigned int initval, int flags);
 
@@ -50,6 +50,7 @@  int eventfd(unsigned int initval, int flags);
  */
 
 #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
+#define MAX_QUEUE_SIZE 33554432 // 32MB
 
 typedef struct RBDAIOCB {
     BlockDriverAIOCB common;
@@ -79,6 +80,9 @@  typedef struct BDRVRBDState {
     uint64_t size;
     uint64_t objsize;
     int qemu_aio_count;
+    uint64_t queuesize;
+    pthread_mutex_t *queue_mutex;
+    pthread_cond_t *queue_threshold;
 } BDRVRBDState;
 
 typedef struct rbd_obj_header_ondisk RbdHeader1;
@@ -334,6 +338,12 @@  static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
     le64_to_cpus((uint64_t *) & header->image_size);
     s->size = header->image_size;
     s->objsize = 1 << header->options.order;
+    s->queuesize = 0;
+
+    s->queue_mutex = qemu_malloc(sizeof(pthread_mutex_t));
+    pthread_mutex_init(s->queue_mutex, NULL);
+    s->queue_threshold = qemu_malloc(sizeof(pthread_cond_t));
+    pthread_cond_init (s->queue_threshold, NULL);
 
     s->efd = eventfd(0, 0);
     if (s->efd < 0) {
@@ -356,6 +366,11 @@  static void rbd_close(BlockDriverState *bs)
 {
     BDRVRBDState *s = bs->opaque;
 
+    pthread_cond_destroy(s->queue_threshold);
+    qemu_free(s->queue_threshold);
+    pthread_mutex_destroy(s->queue_mutex);
+    qemu_free(s->queue_mutex);
+
     rados_close_pool(s->pool);
     rados_deinitialize();
 }
@@ -443,6 +458,12 @@  static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
     int i;
 
     acb->aiocnt--;
+    acb->s->queuesize -= rcb->segsize;
+    if (acb->s->queuesize+rcb->segsize > MAX_QUEUE_SIZE && acb->s->queuesize <= MAX_QUEUE_SIZE) {
+        pthread_mutex_lock(acb->s->queue_mutex);
+        pthread_cond_signal(acb->s->queue_threshold);
+        pthread_mutex_unlock(acb->s->queue_mutex);
+    }
     r = rados_aio_get_return_value(c);
     rados_aio_release(c);
     if (acb->write) {
@@ -560,6 +581,14 @@  static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
         rcb->segsize = segsize;
         rcb->buf = buf;
 
+        while  (s->queuesize > MAX_QUEUE_SIZE) {
+            pthread_mutex_lock(s->queue_mutex);
+            pthread_cond_wait(s->queue_threshold, s->queue_mutex);
+            pthread_mutex_unlock(s->queue_mutex);
+        }
+
+        s->queuesize += segsize;
+
         if (write) {
             rados_aio_create_completion(rcb, NULL,
                                         (rados_callback_t) rbd_finish_aiocb,