From patchwork Tue Jul 13 19:23:38 2010 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit X-Patchwork-Submitter: Christian Brunner X-Patchwork-Id: 111827 Received: from vger.kernel.org (vger.kernel.org [209.132.180.67]) by demeter.kernel.org (8.14.4/8.14.3) with ESMTP id o6DJNjQC030420 for ; Tue, 13 Jul 2010 19:23:45 GMT Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S1753551Ab0GMTXo (ORCPT ); Tue, 13 Jul 2010 15:23:44 -0400 Received: from mail-bw0-f46.google.com ([209.85.214.46]:39445 "EHLO mail-bw0-f46.google.com" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S1751042Ab0GMTXn (ORCPT ); Tue, 13 Jul 2010 15:23:43 -0400 Received: by bwz1 with SMTP id 1so487360bwz.19 for ; Tue, 13 Jul 2010 12:23:41 -0700 (PDT) Received: by 10.204.82.6 with SMTP id z6mr240947bkk.31.1279049021392; Tue, 13 Jul 2010 12:23:41 -0700 (PDT) Received: from sir.home (e181029245.adsl.alicedsl.de [85.181.29.245]) by mx.google.com with ESMTPS id s34sm25434087bkk.13.2010.07.13.12.23.40 (version=SSLv3 cipher=RC4-MD5); Tue, 13 Jul 2010 12:23:40 -0700 (PDT) Date: Tue, 13 Jul 2010 21:23:38 +0200 From: Christian Brunner To: Yehuda Sadeh Weinraub Cc: Kevin Wolf , Simone Gotti , ceph-devel@vger.kernel.org, qemu-devel@nongnu.org, kvm@vger.kernel.org Subject: Re: [Qemu-devel] Re: [PATCH] ceph/rbd block driver for qemu-kvm (v3) Message-ID: <20100713192338.GA25126@sir.home> References: <20100531193140.GA13993@chb-desktop> <4C1293B7.1060307@gmail.com> <4C1B45DB.4000502@redhat.com> MIME-Version: 1.0 Content-Disposition: inline In-Reply-To: User-Agent: Mutt/1.5.20 (2009-12-10) Sender: ceph-devel-owner@vger.kernel.org Precedence: bulk List-ID: X-Mailing-List: ceph-devel@vger.kernel.org X-Greylist: IP, sender and recipient auto-whitelisted, not delayed by milter-greylist-4.2.3 (demeter.kernel.org [140.211.167.41]); Tue, 13 Jul 2010 19:23:46 +0000 (UTC) diff --git a/block/rbd.c b/block/rbd.c index 10daf20..c6693d7 100644 --- a/block/rbd.c +++ b/block/rbd.c @@ -24,7 +24,7 @@ #include #include - +#include int eventfd(unsigned int initval, int flags); @@ -50,6 +50,7 @@ int eventfd(unsigned int initval, int flags); */ #define OBJ_MAX_SIZE (1UL << OBJ_DEFAULT_OBJ_ORDER) +#define MAX_QUEUE_SIZE 33554432 // 32MB typedef struct RBDAIOCB { BlockDriverAIOCB common; @@ -79,6 +80,9 @@ typedef struct BDRVRBDState { uint64_t size; uint64_t objsize; int qemu_aio_count; + uint64_t queuesize; + pthread_mutex_t *queue_mutex; + pthread_cond_t *queue_threshold; } BDRVRBDState; typedef struct rbd_obj_header_ondisk RbdHeader1; @@ -334,6 +338,12 @@ static int rbd_open(BlockDriverState *bs, const char *filename, int flags) le64_to_cpus((uint64_t *) & header->image_size); s->size = header->image_size; s->objsize = 1 << header->options.order; + s->queuesize = 0; + + s->queue_mutex = qemu_malloc(sizeof(pthread_mutex_t)); + pthread_mutex_init(s->queue_mutex, NULL); + s->queue_threshold = qemu_malloc(sizeof(pthread_cond_t)); + pthread_cond_init (s->queue_threshold, NULL); s->efd = eventfd(0, 0); if (s->efd < 0) { @@ -356,6 +366,11 @@ static void rbd_close(BlockDriverState *bs) { BDRVRBDState *s = bs->opaque; + pthread_cond_destroy(s->queue_threshold); + qemu_free(s->queue_threshold); + pthread_mutex_destroy(s->queue_mutex); + qemu_free(s->queue_mutex); + rados_close_pool(s->pool); rados_deinitialize(); } @@ -443,6 +458,12 @@ static void rbd_finish_aiocb(rados_completion_t c, RADOSCB *rcb) int i; acb->aiocnt--; + acb->s->queuesize -= rcb->segsize; + if (acb->s->queuesize+rcb->segsize > MAX_QUEUE_SIZE && acb->s->queuesize <= MAX_QUEUE_SIZE) { + pthread_mutex_lock(acb->s->queue_mutex); + pthread_cond_signal(acb->s->queue_threshold); + pthread_mutex_unlock(acb->s->queue_mutex); + } r = rados_aio_get_return_value(c); rados_aio_release(c); if (acb->write) { @@ -560,6 +581,14 @@ static BlockDriverAIOCB *rbd_aio_rw_vector(BlockDriverState *bs, rcb->segsize = segsize; rcb->buf = buf; + while (s->queuesize > MAX_QUEUE_SIZE) { + pthread_mutex_lock(s->queue_mutex); + pthread_cond_wait(s->queue_threshold, s->queue_mutex); + pthread_mutex_unlock(s->queue_mutex); + } + + s->queuesize += segsize; + if (write) { rados_aio_create_completion(rcb, NULL, (rados_callback_t) rbd_finish_aiocb,