diff mbox

[RFC,v2,05/22] block/pcache: add aio requests into cache

Message ID 20160829171021.4902-6-pbutsykin@virtuozzo.com (mailing list archive)
State New, archived
Headers show

Commit Message

Pavel Butsykin Aug. 29, 2016, 5:10 p.m. UTC
For storing requests use an rbtree, here are add basic operations on the
rbtree to work with  cache nodes.

Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
---
 block/pcache.c | 190 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 189 insertions(+), 1 deletion(-)

Comments

Kevin Wolf Sept. 1, 2016, 3:28 p.m. UTC | #1
Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
> For storing requests use an rbtree, here are add basic operations on the
> rbtree to work with  cache nodes.
> 
> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
> ---
>  block/pcache.c | 190 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 189 insertions(+), 1 deletion(-)
> 
> diff --git a/block/pcache.c b/block/pcache.c
> index 7f221d6..f5022f9 100644
> --- a/block/pcache.c
> +++ b/block/pcache.c
> @@ -27,6 +27,7 @@
>  #include "block/raw-aio.h"
>  #include "qapi/error.h"
>  #include "qapi/qmp/qstring.h"
> +#include "qemu/rbtree.h"
>  
>  #define PCACHE_DEBUG
>  
> @@ -37,9 +38,53 @@
>  #define DPRINTF(fmt, ...) do { } while (0)
>  #endif
>  
> +typedef struct RbNodeKey {
> +    uint64_t    num;
> +    uint32_t    size;
> +} RbNodeKey;
> +
> +typedef struct BlockNode {
> +    struct RbNode               rb_node;
> +    union {
> +        RbNodeKey               key;
> +        struct {
> +            uint64_t            sector_num;
> +            uint32_t            nb_sectors;
> +        };
> +    };

What's the deal with this union?

It just adds an alias, and 'sector_num'/'nb_sectors' are actually longer
to type than 'key.num' and 'key.size', so the only advantage that I
could image doesn't really apply.

But it brings problems: We always have to be careful to keep the two
structs in sync, and grepping for the field name will only bring up half
of the users because the other half uses the other alias.

> +    QTAILQ_ENTRY(BlockNode) entry;
> +} BlockNode;
> +
> +typedef struct PCNode {
> +    BlockNode cm;
> +
> +    uint8_t                  *data;
> +} PCNode;

What do 'PC' and 'cm' mean?

> +typedef struct ReqStor {
> +    struct {
> +        struct RbRoot root;
> +        CoMutex       lock;
> +    } tree;
> +
> +    uint32_t curr_size;
> +} ReqStor;

Same question for ReqStor. For an identifier that is used only three
times, it could be a bit more descriptive.

What unit has curr_size or what does it count? The nodes in the tree?
Also, cur_ seems to be more common as a prefix than curr_.

> +typedef struct BDRVPCacheState {
> +    BlockDriverState **bs;

This is unused. (Good thing, it looks weird.)

> +    ReqStor pcache;
> +
> +    struct {
> +        QTAILQ_HEAD(pcache_head, BlockNode) head;
> +        CoMutex lock;
> +    } list;
> +} BDRVPCacheState;
> +
>  typedef struct PrefCacheAIOCB {
>      BlockAIOCB common;
>  
> +    BDRVPCacheState *s;

Not really needed, you already have acb->common.bs.

>      QEMUIOVector *qiov;
>      uint64_t sector_num;
>      uint32_t nb_sectors;
> @@ -64,6 +109,124 @@ static QemuOptsList runtime_opts = {
>      },
>  };
>  
> +#define PCNODE(_n) ((PCNode *)(_n))

container_of() would be preferable for type safety.

> +static int pcache_key_cmp(const RbNodeKey *key1, const RbNodeKey *key2)
> +{
> +    assert(key1 != NULL);
> +    assert(key2 != NULL);
> +
> +    if (key1->num >= key2->num + key2->size) {
> +        return 1;
> +    }
> +    if (key1->num + key1->size <= key2->num) {
> +        return -1;
> +    }
> +
> +    return 0;
> +}
> +
> +static void *node_insert(struct RbRoot *root, BlockNode *node)
> +{
> +    struct RbNode **new = &(root->rb_node), *parent = NULL;
> +
> +    /* Figure out where to put new node */
> +    while (*new) {
> +        BlockNode *this = container_of(*new, BlockNode, rb_node);
> +        int result = pcache_key_cmp(&node->key, &this->key);
> +        if (result == 0) {
> +            return this;
> +        }
> +        parent = *new;
> +        new = result < 0 ? &((*new)->rb_left) : &((*new)->rb_right);
> +    }
> +    /* Add new node and rebalance tree. */
> +    rb_link_node(&node->rb_node, parent, new);
> +    rb_insert_color(&node->rb_node, root);
> +
> +    return node;
> +}
> +
> +static inline PCNode *pcache_node_insert(struct RbRoot *root, PCNode *node)
> +{
> +    return node_insert(root, &node->cm);
> +}
> +
> +static inline void pcache_node_free(PCNode *node)
> +{
> +    g_free(node->data);
> +    g_slice_free1(sizeof(*node), node);
> +}

We moved away from g_slice_* because it turned out that it hurt more
than it helped.

> +static inline void *pcache_node_alloc(RbNodeKey* key)
> +{
> +    PCNode *node = g_slice_alloc(sizeof(*node));
> +
> +    node->cm.sector_num = key->num;
> +    node->cm.nb_sectors = key->size;

In other words, node->cm.key = *key;

> +    node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
> +
> +    return node;
> +}
> +
> +static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
> +                                        PCNode **out_node)
> +{
> +    BDRVPCacheState *s = acb->s;
> +    PCNode *new_node = pcache_node_alloc(key);
> +    PCNode *found;
> +
> +    qemu_co_mutex_lock(&s->pcache.tree.lock);
> +    found = pcache_node_insert(&s->pcache.tree.root, new_node);
> +    qemu_co_mutex_unlock(&s->pcache.tree.lock);

pcache_node_insert() doesn't yield, so the CoMutex is unnecessary.

> +    if (found != new_node) {
> +        pcache_node_free(new_node);

Isn't it a bit wasteful to allocate a new node just in case and then
immediately free it again if it turns out that we don't need it?

> +        *out_node = found;
> +        return false;
> +    }
> +    atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);

atomic_add() implies that you have concurrent threads. I don't see any.

> +    qemu_co_mutex_lock(&s->list.lock);
> +    QTAILQ_INSERT_HEAD(&s->list.head, &new_node->cm, entry);
> +    qemu_co_mutex_unlock(&s->list.lock);

Same here as above, QTAILQ_INSERT_HEAD doesn't yield.

> +    *out_node = new_node;
> +    return true;
> +}
> +
> +static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
> +{
> +    key->num = acb->sector_num;
> +    key->size = acb->nb_sectors;
> +}
> +
> +enum {
> +    PREFETCH_NEW_NODE  = 0,
> +    PREFETCH_FULL_UP   = 1,
> +    PREFETCH_PART_UP   = 2
> +};
> +
> +static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
> +{
> +    RbNodeKey key;
> +    PCNode *node = NULL;
> +
> +    prefetch_init_key(acb, &key);
> +    if (pcache_node_find_and_create(acb, &key, &node)) {
> +        return PREFETCH_NEW_NODE;
> +    }
> +
> +    /* Node covers the whole request */
> +    if (node->cm.sector_num <= acb->sector_num &&
> +        node->cm.sector_num + node->cm.nb_sectors >= acb->sector_num +
> +                                                     acb->nb_sectors)
> +    {
> +        return PREFETCH_FULL_UP;
> +    }
> +
> +    return PREFETCH_PART_UP;
> +}
> +
>  static void pcache_aio_cb(void *opaque, int ret)
>  {
>      PrefCacheAIOCB *acb = opaque;
> @@ -80,6 +243,7 @@ static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
>  {
>      PrefCacheAIOCB *acb = qemu_aio_get(&pcache_aiocb_info, bs, cb, opaque);
>  
> +    acb->s = bs->opaque;
>      acb->sector_num = sector_num;
>      acb->nb_sectors = nb_sectors;
>      acb->qiov = qiov;
> @@ -99,6 +263,8 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
>      PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
>                                           opaque, QEMU_AIO_READ);
>  
> +    pcache_prefetch(acb);
> +
>      bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
>                     pcache_aio_cb, acb);
>      return &acb->common;
> @@ -119,6 +285,17 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
>      return &acb->common;
>  }
>  
> +static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
> +{
> +    DPRINTF("pcache configure:\n");
> +
> +    s->pcache.tree.root = RB_ROOT;
> +    qemu_co_mutex_init(&s->pcache.tree.lock);
> +    QTAILQ_INIT(&s->list.head);
> +    qemu_co_mutex_init(&s->list.lock);

QTAILQ_INIT() doesn't yield.

> +    s->pcache.curr_size = 0;
> +}
> +

Kevin
Pavel Butsykin Sept. 6, 2016, 4:54 p.m. UTC | #2
On 01.09.2016 18:28, Kevin Wolf wrote:
> Am 29.08.2016 um 19:10 hat Pavel Butsykin geschrieben:
>> For storing requests use an rbtree, here are add basic operations on the
>> rbtree to work with  cache nodes.
>>
>> Signed-off-by: Pavel Butsykin <pbutsykin@virtuozzo.com>
>> ---
>>   block/pcache.c | 190 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
>>   1 file changed, 189 insertions(+), 1 deletion(-)
>>
>> diff --git a/block/pcache.c b/block/pcache.c
>> index 7f221d6..f5022f9 100644
>> --- a/block/pcache.c
>> +++ b/block/pcache.c
>> @@ -27,6 +27,7 @@
>>   #include "block/raw-aio.h"
>>   #include "qapi/error.h"
>>   #include "qapi/qmp/qstring.h"
>> +#include "qemu/rbtree.h"
>>
>>   #define PCACHE_DEBUG
>>
>> @@ -37,9 +38,53 @@
>>   #define DPRINTF(fmt, ...) do { } while (0)
>>   #endif
>>
>> +typedef struct RbNodeKey {
>> +    uint64_t    num;
>> +    uint32_t    size;
>> +} RbNodeKey;
>> +
>> +typedef struct BlockNode {
>> +    struct RbNode               rb_node;
>> +    union {
>> +        RbNodeKey               key;
>> +        struct {
>> +            uint64_t            sector_num;
>> +            uint32_t            nb_sectors;
>> +        };
>> +    };
>
> What's the deal with this union?
>
> It just adds an alias, and 'sector_num'/'nb_sectors' are actually longer
> to type than 'key.num' and 'key.size', so the only advantage that I
> could image doesn't really apply.
>
Yes, you're right. Initially this was done to reduce the code in those
places where the name change will not confuse. But later it turned out
that such places are few.

> But it brings problems: We always have to be careful to keep the two
> structs in sync, and grepping for the field name will only bring up half
> of the users because the other half uses the other alias.
>
Yep, relative to the rest of the Qemu code it looks unusual, and given
the small benefits it is need to remove :)

>> +    QTAILQ_ENTRY(BlockNode) entry;
>> +} BlockNode;
>> +
>> +typedef struct PCNode {
>> +    BlockNode cm;
>> +
>> +    uint8_t                  *data;
>> +} PCNode;
>
> What do 'PC' and 'cm' mean?
>
PC - PrefetchCache, cm - common.

But I think it would be better to fix it:

PrefetchCache - PrefCacheNode, common - common.

>> +typedef struct ReqStor {
>> +    struct {
>> +        struct RbRoot root;
>> +        CoMutex       lock;
>> +    } tree;
>> +
>> +    uint32_t curr_size;
>> +} ReqStor;
>
> Same question for ReqStor. For an identifier that is used only three
> times, it could be a bit more descriptive.
>
Storage requests

> What unit has curr_size or what does it count? The nodes in the tree?

This is the current size of the storage requests.

> Also, cur_ seems to be more common as a prefix than curr_.
>
OK

>> +typedef struct BDRVPCacheState {
>> +    BlockDriverState **bs;
>
> This is unused. (Good thing, it looks weird.)
>
Apparently I forgot to remove it.

>> +    ReqStor pcache;
>> +
>> +    struct {
>> +        QTAILQ_HEAD(pcache_head, BlockNode) head;
>> +        CoMutex lock;
>> +    } list;
>> +} BDRVPCacheState;
>> +
>>   typedef struct PrefCacheAIOCB {
>>       BlockAIOCB common;
>>
>> +    BDRVPCacheState *s;
>
> Not really needed, you already have acb->common.bs.
>
>>       QEMUIOVector *qiov;
>>       uint64_t sector_num;
>>       uint32_t nb_sectors;
>> @@ -64,6 +109,124 @@ static QemuOptsList runtime_opts = {
>>       },
>>   };
>>
>> +#define PCNODE(_n) ((PCNode *)(_n))
>
> container_of() would be preferable for type safety.
>
OK

>> +static int pcache_key_cmp(const RbNodeKey *key1, const RbNodeKey *key2)
>> +{
>> +    assert(key1 != NULL);
>> +    assert(key2 != NULL);
>> +
>> +    if (key1->num >= key2->num + key2->size) {
>> +        return 1;
>> +    }
>> +    if (key1->num + key1->size <= key2->num) {
>> +        return -1;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static void *node_insert(struct RbRoot *root, BlockNode *node)
>> +{
>> +    struct RbNode **new = &(root->rb_node), *parent = NULL;
>> +
>> +    /* Figure out where to put new node */
>> +    while (*new) {
>> +        BlockNode *this = container_of(*new, BlockNode, rb_node);
>> +        int result = pcache_key_cmp(&node->key, &this->key);
>> +        if (result == 0) {
>> +            return this;
>> +        }
>> +        parent = *new;
>> +        new = result < 0 ? &((*new)->rb_left) : &((*new)->rb_right);
>> +    }
>> +    /* Add new node and rebalance tree. */
>> +    rb_link_node(&node->rb_node, parent, new);
>> +    rb_insert_color(&node->rb_node, root);
>> +
>> +    return node;
>> +}
>> +
>> +static inline PCNode *pcache_node_insert(struct RbRoot *root, PCNode *node)
>> +{
>> +    return node_insert(root, &node->cm);
>> +}
>> +
>> +static inline void pcache_node_free(PCNode *node)
>> +{
>> +    g_free(node->data);
>> +    g_slice_free1(sizeof(*node), node);
>> +}
>
> We moved away from g_slice_* because it turned out that it hurt more
> than it helped.
>
Somewhere I saw it, thanks.

>> +static inline void *pcache_node_alloc(RbNodeKey* key)
>> +{
>> +    PCNode *node = g_slice_alloc(sizeof(*node));
>> +
>> +    node->cm.sector_num = key->num;
>> +    node->cm.nb_sectors = key->size;
>
> In other words, node->cm.key = *key;
>
Yep :)

>> +    node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
>> +
>> +    return node;
>> +}
>> +
>> +static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
>> +                                        PCNode **out_node)
>> +{
>> +    BDRVPCacheState *s = acb->s;
>> +    PCNode *new_node = pcache_node_alloc(key);
>> +    PCNode *found;
>> +
>> +    qemu_co_mutex_lock(&s->pcache.tree.lock);
>> +    found = pcache_node_insert(&s->pcache.tree.root, new_node);
>> +    qemu_co_mutex_unlock(&s->pcache.tree.lock);
>
> pcache_node_insert() doesn't yield, so the CoMutex is unnecessary.
>
>> +    if (found != new_node) {
>> +        pcache_node_free(new_node);
>
> Isn't it a bit wasteful to allocate a new node just in case and then
> immediately free it again if it turns out that we don't need it?
>
Yes, I know about this problem. I wrote about this in TODO list, but
not very detailed.

>> +        *out_node = found;
>> +        return false;
>> +    }
>> +    atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);
>
> atomic_add() implies that you have concurrent threads. I don't see any.
>
Yeah, but what about iothread? Doesn't presence of iothread lead to
parallel handling of requests in multiple threads?

>> +    qemu_co_mutex_lock(&s->list.lock);
>> +    QTAILQ_INSERT_HEAD(&s->list.head, &new_node->cm, entry);
>> +    qemu_co_mutex_unlock(&s->list.lock);
>
> Same here as above, QTAILQ_INSERT_HEAD doesn't yield.
>
>> +    *out_node = new_node;
>> +    return true;
>> +}
>> +
>> +static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
>> +{
>> +    key->num = acb->sector_num;
>> +    key->size = acb->nb_sectors;
>> +}
>> +
>> +enum {
>> +    PREFETCH_NEW_NODE  = 0,
>> +    PREFETCH_FULL_UP   = 1,
>> +    PREFETCH_PART_UP   = 2
>> +};
>> +
>> +static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
>> +{
>> +    RbNodeKey key;
>> +    PCNode *node = NULL;
>> +
>> +    prefetch_init_key(acb, &key);
>> +    if (pcache_node_find_and_create(acb, &key, &node)) {
>> +        return PREFETCH_NEW_NODE;
>> +    }
>> +
>> +    /* Node covers the whole request */
>> +    if (node->cm.sector_num <= acb->sector_num &&
>> +        node->cm.sector_num + node->cm.nb_sectors >= acb->sector_num +
>> +                                                     acb->nb_sectors)
>> +    {
>> +        return PREFETCH_FULL_UP;
>> +    }
>> +
>> +    return PREFETCH_PART_UP;
>> +}
>> +
>>   static void pcache_aio_cb(void *opaque, int ret)
>>   {
>>       PrefCacheAIOCB *acb = opaque;
>> @@ -80,6 +243,7 @@ static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
>>   {
>>       PrefCacheAIOCB *acb = qemu_aio_get(&pcache_aiocb_info, bs, cb, opaque);
>>
>> +    acb->s = bs->opaque;
>>       acb->sector_num = sector_num;
>>       acb->nb_sectors = nb_sectors;
>>       acb->qiov = qiov;
>> @@ -99,6 +263,8 @@ static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
>>       PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
>>                                            opaque, QEMU_AIO_READ);
>>
>> +    pcache_prefetch(acb);
>> +
>>       bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
>>                      pcache_aio_cb, acb);
>>       return &acb->common;
>> @@ -119,6 +285,17 @@ static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
>>       return &acb->common;
>>   }
>>
>> +static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
>> +{
>> +    DPRINTF("pcache configure:\n");
>> +
>> +    s->pcache.tree.root = RB_ROOT;
>> +    qemu_co_mutex_init(&s->pcache.tree.lock);
>> +    QTAILQ_INIT(&s->list.head);
>> +    qemu_co_mutex_init(&s->list.lock);
>
> QTAILQ_INIT() doesn't yield.
>
>> +    s->pcache.curr_size = 0;
>> +}
>> +
>
> Kevin
>
Kevin Wolf Sept. 6, 2016, 5:07 p.m. UTC | #3
Am 06.09.2016 um 18:54 hat Pavel Butsykin geschrieben:
> >>+        *out_node = found;
> >>+        return false;
> >>+    }
> >>+    atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);
> >
> >atomic_add() implies that you have concurrent threads. I don't see any.
> >
> Yeah, but what about iothread? Doesn't presence of iothread lead to
> parallel handling of requests in multiple threads?

No, it doesn't. We're running under the AioContext lock.

If we were actually running in parallel threads, your locks might also
be needed, but then they would have to be real thread locks instead of
coroutine locks. But it doesn't happen today, so I think it's best to
ignore.

Kevin
Pavel Butsykin Sept. 7, 2016, 4:21 p.m. UTC | #4
On 06.09.2016 20:07, Kevin Wolf wrote:
> Am 06.09.2016 um 18:54 hat Pavel Butsykin geschrieben:
>>>> +        *out_node = found;
>>>> +        return false;
>>>> +    }
>>>> +    atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);
>>>
>>> atomic_add() implies that you have concurrent threads. I don't see any.
>>>
>> Yeah, but what about iothread? Doesn't presence of iothread lead to
>> parallel handling of requests in multiple threads?
>
> No, it doesn't. We're running under the AioContext lock.

Yes, you're right. For some reason I thought that there is such a
possibility :)

> If we were actually running in parallel threads, your locks might also
> be needed, but then they would have to be real thread locks instead of
> coroutine locks. But it doesn't happen today, so I think it's best to
> ignore.

I agree, use coroutine locks to protect the data was a bad idea.

> Kevin
>
diff mbox

Patch

diff --git a/block/pcache.c b/block/pcache.c
index 7f221d6..f5022f9 100644
--- a/block/pcache.c
+++ b/block/pcache.c
@@ -27,6 +27,7 @@ 
 #include "block/raw-aio.h"
 #include "qapi/error.h"
 #include "qapi/qmp/qstring.h"
+#include "qemu/rbtree.h"
 
 #define PCACHE_DEBUG
 
@@ -37,9 +38,53 @@ 
 #define DPRINTF(fmt, ...) do { } while (0)
 #endif
 
+typedef struct RbNodeKey {
+    uint64_t    num;
+    uint32_t    size;
+} RbNodeKey;
+
+typedef struct BlockNode {
+    struct RbNode               rb_node;
+    union {
+        RbNodeKey               key;
+        struct {
+            uint64_t            sector_num;
+            uint32_t            nb_sectors;
+        };
+    };
+    QTAILQ_ENTRY(BlockNode) entry;
+} BlockNode;
+
+typedef struct PCNode {
+    BlockNode cm;
+
+    uint8_t                  *data;
+} PCNode;
+
+typedef struct ReqStor {
+    struct {
+        struct RbRoot root;
+        CoMutex       lock;
+    } tree;
+
+    uint32_t curr_size;
+} ReqStor;
+
+typedef struct BDRVPCacheState {
+    BlockDriverState **bs;
+
+    ReqStor pcache;
+
+    struct {
+        QTAILQ_HEAD(pcache_head, BlockNode) head;
+        CoMutex lock;
+    } list;
+} BDRVPCacheState;
+
 typedef struct PrefCacheAIOCB {
     BlockAIOCB common;
 
+    BDRVPCacheState *s;
     QEMUIOVector *qiov;
     uint64_t sector_num;
     uint32_t nb_sectors;
@@ -64,6 +109,124 @@  static QemuOptsList runtime_opts = {
     },
 };
 
+#define PCNODE(_n) ((PCNode *)(_n))
+
+static int pcache_key_cmp(const RbNodeKey *key1, const RbNodeKey *key2)
+{
+    assert(key1 != NULL);
+    assert(key2 != NULL);
+
+    if (key1->num >= key2->num + key2->size) {
+        return 1;
+    }
+    if (key1->num + key1->size <= key2->num) {
+        return -1;
+    }
+
+    return 0;
+}
+
+static void *node_insert(struct RbRoot *root, BlockNode *node)
+{
+    struct RbNode **new = &(root->rb_node), *parent = NULL;
+
+    /* Figure out where to put new node */
+    while (*new) {
+        BlockNode *this = container_of(*new, BlockNode, rb_node);
+        int result = pcache_key_cmp(&node->key, &this->key);
+        if (result == 0) {
+            return this;
+        }
+        parent = *new;
+        new = result < 0 ? &((*new)->rb_left) : &((*new)->rb_right);
+    }
+    /* Add new node and rebalance tree. */
+    rb_link_node(&node->rb_node, parent, new);
+    rb_insert_color(&node->rb_node, root);
+
+    return node;
+}
+
+static inline PCNode *pcache_node_insert(struct RbRoot *root, PCNode *node)
+{
+    return node_insert(root, &node->cm);
+}
+
+static inline void pcache_node_free(PCNode *node)
+{
+    g_free(node->data);
+    g_slice_free1(sizeof(*node), node);
+}
+
+static inline void *pcache_node_alloc(RbNodeKey* key)
+{
+    PCNode *node = g_slice_alloc(sizeof(*node));
+
+    node->cm.sector_num = key->num;
+    node->cm.nb_sectors = key->size;
+    node->data = g_malloc(node->cm.nb_sectors << BDRV_SECTOR_BITS);
+
+    return node;
+}
+
+static bool pcache_node_find_and_create(PrefCacheAIOCB *acb, RbNodeKey *key,
+                                        PCNode **out_node)
+{
+    BDRVPCacheState *s = acb->s;
+    PCNode *new_node = pcache_node_alloc(key);
+    PCNode *found;
+
+    qemu_co_mutex_lock(&s->pcache.tree.lock);
+    found = pcache_node_insert(&s->pcache.tree.root, new_node);
+    qemu_co_mutex_unlock(&s->pcache.tree.lock);
+    if (found != new_node) {
+        pcache_node_free(new_node);
+        *out_node = found;
+        return false;
+    }
+    atomic_add(&s->pcache.curr_size, new_node->cm.nb_sectors);
+
+    qemu_co_mutex_lock(&s->list.lock);
+    QTAILQ_INSERT_HEAD(&s->list.head, &new_node->cm, entry);
+    qemu_co_mutex_unlock(&s->list.lock);
+
+    *out_node = new_node;
+    return true;
+}
+
+static inline void prefetch_init_key(PrefCacheAIOCB *acb, RbNodeKey* key)
+{
+    key->num = acb->sector_num;
+    key->size = acb->nb_sectors;
+}
+
+enum {
+    PREFETCH_NEW_NODE  = 0,
+    PREFETCH_FULL_UP   = 1,
+    PREFETCH_PART_UP   = 2
+};
+
+static int32_t pcache_prefetch(PrefCacheAIOCB *acb)
+{
+    RbNodeKey key;
+    PCNode *node = NULL;
+
+    prefetch_init_key(acb, &key);
+    if (pcache_node_find_and_create(acb, &key, &node)) {
+        return PREFETCH_NEW_NODE;
+    }
+
+    /* Node covers the whole request */
+    if (node->cm.sector_num <= acb->sector_num &&
+        node->cm.sector_num + node->cm.nb_sectors >= acb->sector_num +
+                                                     acb->nb_sectors)
+    {
+        return PREFETCH_FULL_UP;
+    }
+
+    return PREFETCH_PART_UP;
+}
+
 static void pcache_aio_cb(void *opaque, int ret)
 {
     PrefCacheAIOCB *acb = opaque;
@@ -80,6 +243,7 @@  static PrefCacheAIOCB *pcache_aio_get(BlockDriverState *bs, int64_t sector_num,
 {
     PrefCacheAIOCB *acb = qemu_aio_get(&pcache_aiocb_info, bs, cb, opaque);
 
+    acb->s = bs->opaque;
     acb->sector_num = sector_num;
     acb->nb_sectors = nb_sectors;
     acb->qiov = qiov;
@@ -99,6 +263,8 @@  static BlockAIOCB *pcache_aio_readv(BlockDriverState *bs,
     PrefCacheAIOCB *acb = pcache_aio_get(bs, sector_num, qiov, nb_sectors, cb,
                                          opaque, QEMU_AIO_READ);
 
+    pcache_prefetch(acb);
+
     bdrv_aio_readv(bs->file, sector_num, qiov, nb_sectors,
                    pcache_aio_cb, acb);
     return &acb->common;
@@ -119,6 +285,17 @@  static BlockAIOCB *pcache_aio_writev(BlockDriverState *bs,
     return &acb->common;
 }
 
+static void pcache_state_init(QemuOpts *opts, BDRVPCacheState *s)
+{
+    DPRINTF("pcache configure:\n");
+
+    s->pcache.tree.root = RB_ROOT;
+    qemu_co_mutex_init(&s->pcache.tree.lock);
+    QTAILQ_INIT(&s->list.head);
+    qemu_co_mutex_init(&s->list.lock);
+    s->pcache.curr_size = 0;
+}
+
 static int pcache_file_open(BlockDriverState *bs, QDict *options, int flags,
                             Error **errp)
 {
@@ -140,7 +317,9 @@  static int pcache_file_open(BlockDriverState *bs, QDict *options, int flags,
     if (local_err) {
         ret = -EINVAL;
         error_propagate(errp, local_err);
+        goto fail;
     }
+    pcache_state_init(opts, bs->opaque);
 fail:
     qemu_opts_del(opts);
     return ret;
@@ -148,6 +327,15 @@  fail:
 
 static void pcache_close(BlockDriverState *bs)
 {
+    uint32_t cnt = 0;
+    BDRVPCacheState *s = bs->opaque;
+    BlockNode *node, *next;
+    QTAILQ_FOREACH_SAFE(node, &s->list.head, entry, next) {
+        QTAILQ_REMOVE(&s->list.head, node, entry);
+        pcache_node_free(PCNODE(node));
+        cnt++;
+    }
+    DPRINTF("used %d nodes\n", cnt);
 }
 
 static void pcache_parse_filename(const char *filename, QDict *options,
@@ -170,7 +358,7 @@  static bool pcache_recurse_is_first_non_filter(BlockDriverState *bs,
 static BlockDriver bdrv_pcache = {
     .format_name                        = "pcache",
     .protocol_name                      = "pcache",
-    .instance_size                      = 0,
+    .instance_size                      = sizeof(BDRVPCacheState),
 
     .bdrv_parse_filename                = pcache_parse_filename,
     .bdrv_file_open                     = pcache_file_open,