@@ -50,6 +50,7 @@ int eventfd(unsigned int initval, int flags);
*/
#define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER)
+#define MAX_QUEUE_SIZE 33554432 // 32MB
typedef struct RBDAIOCB {
BlockDriverAIOCB common;
@@ -79,6 +80,7 @@ typedef struct BDRVRBDState {
uint64_t size;
uint64_t objsize;
int qemu_aio_count;
+ uint64_t queuesize;
} BDRVRBDState;
typedef struct rbd_obj_header_ondisk RbdHeader1;
@@ -334,6 +336,7 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags)
le64_to_cpus((uint64_t *) & header->image_size);
s->size = header->image_size;
s->objsize = 1 << header->options.order;
+ s->queuesize = 0;
s->efd = eventfd(0, 0);
if (s->efd < 0) {
@@ -443,6 +446,7 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb)
int i;
acb->aiocnt--;
+ acb->s->queuesize -= rcb->segsize;
r = rados_aio_get_return_value(c);
rados_aio_release(c);
if (acb->write) {
@@ -560,6 +564,12 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs,
rcb->segsize = segsize;
rcb->buf = buf;
+ while (s->queuesize > MAX_QUEUE_SIZE) {
+ usleep(100);
+ }
+
+ s->queuesize += segsize;
+
if (write) {
rados_aio_create_completion(rcb, NULL,
(rados_callback_t) rbd_finish_aiocb,