[RFC,2/4] lightnvm: add write buffering for rrpc
diff mbox

Message ID 1454591299-30305-3-git-send-email-javier@javigon.com
State New
Headers show

Commit Message

=?UTF-8?q?Javier=20Gonz=C3=A1lez?= Feb. 4, 2016, 1:08 p.m. UTC
Flash controllers typically define flash pages as a collection of flash
sectors of typically 4K. Moreover, flash controllers might program flash
pages across several planes. This defines the write granurality at which
flash can be programmed. This is different for each flash controller. In
rrpc, the assumption has been that flash pages are 4K, thus writes could
be sent out to the media as they were received.

This patch adds a per-block write buffer to rrpc. Writes are flushed to
the media in chuncks of the minimum granurality allowed by the
controller, though the strategy can be changed to, for example, only
flush at the maximum supported (64 pages in nvme). Apart from enabling
the use of rrpc in real hardware, this buffering strategy will be used
for recovery; if a block becomes bad, a new block can be allocated to
persist valid data.

Signed-off-by: Javier González <javier@cnexlabs.com>
---
 drivers/lightnvm/rrpc.c  | 851 ++++++++++++++++++++++++++++++++++++++---------
 drivers/lightnvm/rrpc.h  |  82 ++++-
 include/linux/lightnvm.h |   6 +-
 3 files changed, 771 insertions(+), 168 deletions(-)

Comments

Matias Bjørling Feb. 5, 2016, 2:52 p.m. UTC | #1
On 02/04/2016 02:08 PM, Javier González wrote:
> Flash controllers typically define flash pages as a collection of flash
> sectors of typically 4K. Moreover, flash controllers might program flash
> pages across several planes. This defines the write granurality at which
> flash can be programmed. This is different for each flash controller. In
> rrpc, the assumption has been that flash pages are 4K, thus writes could
> be sent out to the media as they were received.
> 
> This patch adds a per-block write buffer to rrpc. Writes are flushed to
> the media in chuncks of the minimum granurality allowed by the
> controller, though the strategy can be changed to, for example, only
> flush at the maximum supported (64 pages in nvme). Apart from enabling
> the use of rrpc in real hardware, this buffering strategy will be used
> for recovery; if a block becomes bad, a new block can be allocated to
> persist valid data.

Great work. The code is nearly 2x'ed in size. Can you add a detailed
explanation on the inner workings of the write buffering to help new
users to understand its logic?

> 
> Signed-off-by: Javier González <javier@cnexlabs.com>
> ---
>  drivers/lightnvm/rrpc.c  | 851 ++++++++++++++++++++++++++++++++++++++---------
>  drivers/lightnvm/rrpc.h  |  82 ++++-
>  include/linux/lightnvm.h |   6 +-
>  3 files changed, 771 insertions(+), 168 deletions(-)
> 
> diff --git a/drivers/lightnvm/rrpc.c b/drivers/lightnvm/rrpc.c
> index 8187bf3..e9fb19d 100644
> --- a/drivers/lightnvm/rrpc.c
> +++ b/drivers/lightnvm/rrpc.c
> @@ -16,11 +16,12 @@
>  
>  #include "rrpc.h"
>  
> -static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache;
> +static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache, *rrpc_rrq_cache,
> +			*rrpc_buf_rrq_cache, *rrpc_wb_cache, *rrpc_block_cache;
>  static DECLARE_RWSEM(rrpc_lock);
>  
>  static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> -				struct nvm_rq *rqd, unsigned long flags);
> +				struct rrpc_rq *rqd, unsigned long flags);
>  
>  #define rrpc_for_each_lun(rrpc, rlun, i) \
>  		for ((i) = 0, rlun = &(rrpc)->luns[0]; \
> @@ -62,53 +63,59 @@ static void rrpc_invalidate_range(struct rrpc *rrpc, sector_t slba,
>  	spin_unlock(&rrpc->rev_lock);
>  }
>  
> -static struct nvm_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
> +static void rrpc_release_and_free_rrqd(struct kref *ref)
> +{
> +	struct rrpc_rq *rrqd = container_of(ref, struct rrpc_rq, refs);
> +	struct rrpc *rrpc = rrqd->addr->rblk->rlun->rrpc;

Wow.. Happy that we got to rrpc :)

> +
> +	rrpc_unlock_rq(rrpc, rrqd);
> +	mempool_free(rrqd, rrpc->rrq_pool);
> +}
> +
> +static struct rrpc_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
>  					sector_t laddr, unsigned int pages)
>  {
> -	struct nvm_rq *rqd;
> +	struct rrpc_rq *rrqd;
>  	struct rrpc_inflight_rq *inf;
>  
> -	rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
> -	if (!rqd)
> +	rrqd = mempool_alloc(rrpc->rrq_pool, GFP_ATOMIC);
> +	if (!rrqd)
>  		return ERR_PTR(-ENOMEM);
> +	kref_init(&rrqd->refs);
>  
> -	inf = rrpc_get_inflight_rq(rqd);
> +	inf = rrpc_get_inflight_rq(rrqd);
>  	if (rrpc_lock_laddr(rrpc, laddr, pages, inf)) {
> -		mempool_free(rqd, rrpc->rq_pool);
> +		mempool_free(rrqd, rrpc->rrq_pool);
>  		return NULL;
>  	}
>  
> -	return rqd;
> +	return rrqd;
>  }
>  
> -static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct nvm_rq *rqd)
> +static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct rrpc_rq *rrqd)
>  {
> -	struct rrpc_inflight_rq *inf = rrpc_get_inflight_rq(rqd);
> -
> -	rrpc_unlock_laddr(rrpc, inf);
> -
> -	mempool_free(rqd, rrpc->rq_pool);
> +	kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
>  }
>  
>  static void rrpc_discard(struct rrpc *rrpc, struct bio *bio)
>  {
>  	sector_t slba = bio->bi_iter.bi_sector / NR_PHY_IN_LOG;
>  	sector_t len = bio->bi_iter.bi_size / RRPC_EXPOSED_PAGE_SIZE;
> -	struct nvm_rq *rqd;
> +	struct rrpc_rq *rrqd;
>  
>  	do {
> -		rqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
> +		rrqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
>  		schedule();
> -	} while (!rqd);
> +	} while (!rrqd);
>  
> -	if (IS_ERR(rqd)) {
> +	if (IS_ERR(rrqd)) {
>  		pr_err("rrpc: unable to acquire inflight IO\n");
>  		bio_io_error(bio);
>  		return;
>  	}
>  
>  	rrpc_invalidate_range(rrpc, slba, len);
> -	rrpc_inflight_laddr_release(rrpc, rqd);
> +	rrpc_inflight_laddr_release(rrpc, rrqd);
>  }
>  
>  static int block_is_full(struct rrpc *rrpc, struct rrpc_block *rblk)
> @@ -166,8 +173,6 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
>  {
>  	struct rrpc *rrpc = rlun->rrpc;
>  
> -	BUG_ON(!rblk);
> -
>  	if (rlun->cur) {
>  		spin_lock(&rlun->cur->lock);
>  		WARN_ON(!block_is_full(rrpc, rlun->cur));
> @@ -176,12 +181,107 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
>  	rlun->cur = rblk;
>  }
>  
> +static void rrpc_free_w_buffer(struct rrpc *rrpc, struct rrpc_block *rblk)
> +{
> +try:
> +	spin_lock(&rblk->w_buf.s_lock);
> +	if (atomic_read(&rblk->w_buf.refs) > 0) {
> +		spin_unlock(&rblk->w_buf.s_lock);
> +		schedule();
> +		goto try;
> +	}
> +
> +	mempool_free(rblk->w_buf.entries, rrpc->block_pool);
> +	rblk->w_buf.entries = NULL;
> +	spin_unlock(&rblk->w_buf.s_lock);
> +
> +	/* TODO: Reuse the same buffers if the block size is the same */
> +	mempool_free(rblk->w_buf.data, rrpc->write_buf_pool);
> +	kfree(rblk->w_buf.sync_bitmap);
> +
> +	rblk->w_buf.mem = NULL;
> +	rblk->w_buf.subm = NULL;
> +	rblk->w_buf.sync_bitmap = NULL;
> +	rblk->w_buf.data = NULL;
> +	rblk->w_buf.nentries = 0;
> +	rblk->w_buf.cur_mem = 0;
> +	rblk->w_buf.cur_subm = 0;
> +}
> +
> +static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
> +{
> +	struct rrpc_lun *rlun = rblk->rlun;
> +	struct nvm_lun *lun = rlun->parent;
> +	struct rrpc_w_buf *buf = &rblk->w_buf;
> +
> +try:
> +	spin_lock(&buf->w_lock);
> +	/* Flush inflight I/Os */
> +	if (!bitmap_full(buf->sync_bitmap, buf->cur_mem)) {
> +		spin_unlock(&buf->w_lock);
> +		schedule();
> +		goto try;
> +	}
> +	spin_unlock(&buf->w_lock);
> +
> +	if (rblk->w_buf.entries)
> +		rrpc_free_w_buffer(rrpc, rblk);
> +
> +	spin_lock(&lun->lock);
> +	nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
> +	list_del(&rblk->list);
> +	spin_unlock(&lun->lock);
> +}
> +
> +static void rrpc_put_blks(struct rrpc *rrpc)
> +{
> +	struct rrpc_lun *rlun;
> +	int i;
> +
> +	for (i = 0; i < rrpc->nr_luns; i++) {
> +		rlun = &rrpc->luns[i];
> +		if (rlun->cur)
> +			rrpc_put_blk(rrpc, rlun->cur);
> +		if (rlun->gc_cur)
> +			rrpc_put_blk(rrpc, rlun->gc_cur);
> +	}
> +}
> +
>  static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
>  							unsigned long flags)
>  {
> +	struct nvm_dev *dev = rrpc->dev;
>  	struct nvm_lun *lun = rlun->parent;
>  	struct nvm_block *blk;
>  	struct rrpc_block *rblk;
> +	struct buf_entry *entries;
> +	unsigned long *sync_bitmap;
> +	void *data;
> +	int nentries = dev->pgs_per_blk * dev->sec_per_pg;
> +
> +	data = mempool_alloc(rrpc->write_buf_pool, GFP_ATOMIC);
> +	if (!data) {
> +		pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
> +		return NULL;
> +	}
> +
> +	entries = mempool_alloc(rrpc->block_pool, GFP_ATOMIC);
> +	if (!entries) {
> +		pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
> +		mempool_free(data, rrpc->write_buf_pool);
> +		return NULL;
> +	}
> +
> +	/* TODO: Mempool? */

Not on fast path. I don't think we need mempools at all to take some
unnecessary memory.

> +	sync_bitmap = kmalloc(BITS_TO_LONGS(nentries) *
> +					sizeof(unsigned long), GFP_ATOMIC);
> +	if (!sync_bitmap) {
> +		mempool_free(data, rrpc->write_buf_pool);
> +		mempool_free(entries, rrpc->block_pool);
> +		return NULL;
> +	}
> +
> +	bitmap_zero(sync_bitmap, nentries);
>  
>  	spin_lock(&lun->lock);
>  	blk = nvm_get_blk_unlocked(rrpc->dev, rlun->parent, flags);
> @@ -192,43 +292,34 @@ static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
>  	}
>  
>  	rblk = &rlun->blocks[blk->id];
> -	list_add_tail(&rblk->list, &rlun->open_list);
> -	spin_unlock(&lun->lock);
>  
>  	blk->priv = rblk;
> -	bitmap_zero(rblk->invalid_pages, rrpc->dev->pgs_per_blk);
> +	bitmap_zero(rblk->invalid_pages, dev->pgs_per_blk);
>  	rblk->next_page = 0;
>  	rblk->nr_invalid_pages = 0;
> -	atomic_set(&rblk->data_cmnt_size, 0);
> +
> +	rblk->w_buf.data = data;
> +	rblk->w_buf.entries = entries;
> +	rblk->w_buf.sync_bitmap = sync_bitmap;
> +
> +	rblk->w_buf.entries->data  = rblk->w_buf.data;
> +	rblk->w_buf.mem = rblk->w_buf.entries;
> +	rblk->w_buf.subm = rblk->w_buf.entries;
> +	rblk->w_buf.nentries = nentries;
> +	rblk->w_buf.cur_mem = 0;
> +	rblk->w_buf.cur_subm = 0;
> +
> +	atomic_set(&rblk->w_buf.refs, 0);
> +
> +	spin_lock_init(&rblk->w_buf.w_lock);
> +	spin_lock_init(&rblk->w_buf.s_lock);
> +
> +	list_add_tail(&rblk->list, &rlun->open_list);
> +	spin_unlock(&lun->lock);
>  
>  	return rblk;
>  }
>  
> -static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
> -{
> -	struct rrpc_lun *rlun = rblk->rlun;
> -	struct nvm_lun *lun = rlun->parent;
> -
> -	spin_lock(&lun->lock);
> -	nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
> -	list_del(&rblk->list);
> -	spin_unlock(&lun->lock);
> -}
> -
> -static void rrpc_put_blks(struct rrpc *rrpc)
> -{
> -	struct rrpc_lun *rlun;
> -	int i;
> -
> -	for (i = 0; i < rrpc->nr_luns; i++) {
> -		rlun = &rrpc->luns[i];
> -		if (rlun->cur)
> -			rrpc_put_blk(rrpc, rlun->cur);
> -		if (rlun->gc_cur)
> -			rrpc_put_blk(rrpc, rlun->gc_cur);
> -	}
> -}
> -
>  static struct rrpc_lun *get_next_lun(struct rrpc *rrpc)
>  {
>  	int next = atomic_inc_return(&rrpc->next_lun);
> @@ -247,6 +338,17 @@ static void rrpc_gc_kick(struct rrpc *rrpc)
>  	}
>  }
>  
> +static void rrpc_writer_kick(struct rrpc *rrpc)
> +{
> +	struct rrpc_lun *rlun;
> +	unsigned int i;
> +
> +	for (i = 0; i < rrpc->nr_luns; i++) {
> +		rlun = &rrpc->luns[i];
> +		queue_work(rrpc->kw_wq, &rlun->ws_writer);
> +	}
> +}
> +
>  /*
>   * timed GC every interval.
>   */
> @@ -282,7 +384,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
>  {
>  	struct request_queue *q = rrpc->dev->q;
>  	struct rrpc_rev_addr *rev;
> -	struct nvm_rq *rqd;
> +	struct rrpc_rq *rrqd;
>  	struct bio *bio;
>  	struct page *page;
>  	int slot;
> @@ -306,7 +408,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
>  	}
>  
>  	while ((slot = find_first_zero_bit(rblk->invalid_pages,
> -					    nr_pgs_per_blk)) < nr_pgs_per_blk) {
> +					nr_pgs_per_blk)) < nr_pgs_per_blk) {

No need to fix the indentation here.

>  
>  		/* Lock laddr */
>  		phys_addr = (rblk->parent->id * nr_pgs_per_blk) + slot;
> @@ -321,8 +423,8 @@ try:
>  			continue;
>  		}
>  
> -		rqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
> -		if (IS_ERR_OR_NULL(rqd)) {
> +		rrqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
> +		if (IS_ERR_OR_NULL(rrqd)) {
>  			spin_unlock(&rrpc->rev_lock);
>  			schedule();
>  			goto try;
> @@ -339,14 +441,14 @@ try:
>  		/* TODO: may fail when EXP_PG_SIZE > PAGE_SIZE */
>  		bio_add_pc_page(q, bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
>  
> -		if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
> +		if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)) {
>  			pr_err("rrpc: gc read failed.\n");
> -			rrpc_inflight_laddr_release(rrpc, rqd);
> +			rrpc_inflight_laddr_release(rrpc, rrqd);
>  			goto finished;
>  		}
>  		wait_for_completion_io(&wait);
>  		if (bio->bi_error) {
> -			rrpc_inflight_laddr_release(rrpc, rqd);
> +			rrpc_inflight_laddr_release(rrpc, rrqd);
>  			goto finished;
>  		}
>  
> @@ -363,14 +465,16 @@ try:
>  		/* turn the command around and write the data back to a new
>  		 * address
>  		 */
> -		if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
> +		if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)
> +							!= NVM_IO_DONE) {
>  			pr_err("rrpc: gc write failed.\n");
> -			rrpc_inflight_laddr_release(rrpc, rqd);
> +			rrpc_inflight_laddr_release(rrpc, rrqd);
>  			goto finished;
>  		}
> +		bio_endio(bio);
>  		wait_for_completion_io(&wait);
>  
> -		rrpc_inflight_laddr_release(rrpc, rqd);
> +		/* rrpc_inflight_laddr_release(rrpc, rrqd); */

This is commented out.

>  		if (bio->bi_error)
>  			goto finished;
>  
> @@ -514,6 +618,8 @@ static void rrpc_gc_queue(struct work_struct *work)
>  	list_move_tail(&rblk->list, &rlun->closed_list);
>  	spin_unlock(&lun->lock);
>  
> +	rrpc_free_w_buffer(rrpc, rblk);
> +
>  	mempool_free(gcb, rrpc->gcb_pool);
>  	pr_debug("nvm: block '%lu' is full, allow GC (sched)\n",
>  							rblk->parent->id);
> @@ -663,43 +769,72 @@ static void rrpc_run_gc(struct rrpc *rrpc, struct rrpc_block *rblk)
>  	queue_work(rrpc->kgc_wq, &gcb->ws_gc);
>  }
>  
> -static void rrpc_end_io_write(struct rrpc *rrpc, struct rrpc_rq *rrqd,
> -						sector_t laddr, uint8_t npages)
> +static void rrpc_sync_buffer(struct rrpc *rrpc, struct rrpc_addr *p)
>  {
> -	struct rrpc_addr *p;
>  	struct rrpc_block *rblk;
> +	struct rrpc_w_buf *buf;
>  	struct nvm_lun *lun;
> -	int cmnt_size, i;
> +	unsigned long bppa;
>  
> -	for (i = 0; i < npages; i++) {
> -		p = &rrpc->trans_map[laddr + i];
> -		rblk = p->rblk;
> -		lun = rblk->parent->lun;
> +	rblk = p->rblk;
> +	buf = &rblk->w_buf;
> +	lun = rblk->parent->lun;
>  
> -		cmnt_size = atomic_inc_return(&rblk->data_cmnt_size);
> -		if (unlikely(cmnt_size == rrpc->dev->pgs_per_blk))
> -			rrpc_run_gc(rrpc, rblk);
> +	bppa = rrpc->dev->sec_per_blk * rblk->parent->id;
> +
> +	WARN_ON(test_and_set_bit((p->addr - bppa), buf->sync_bitmap));
> +
> +	if (unlikely(bitmap_full(buf->sync_bitmap, buf->nentries))) {
> +		/* Write buffer out-of-bounds */
> +		WARN_ON((buf->cur_mem != buf->nentries) &&
> +					(buf->cur_mem != buf->cur_subm));
> +
> +		rrpc_run_gc(rrpc, rblk);
> +	}
> +}
> +
> +static void rrpc_end_io_write(struct rrpc *rrpc, struct nvm_rq *rqd,
> +							uint8_t nr_pages)
> +{
> +	struct rrpc_buf_rq *brrqd = nvm_rq_to_pdu(rqd);
> +	struct rrpc_rq *rrqd;
> +	int i;
> +
> +	for (i = 0; i < nr_pages; i++) {
> +		rrqd = brrqd[i].rrqd;
> +		rrpc_sync_buffer(rrpc, brrqd[i].addr);
> +		kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
>  	}
> +
> +	mempool_free(brrqd, rrpc->m_rrq_pool);
> +	rrpc_writer_kick(rrpc);
> +}
> +
> +static void rrpc_end_io_read(struct rrpc *rrpc, struct nvm_rq *rqd,
> +							uint8_t nr_pages)
> +{
> +	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> +
> +	if (rqd->flags & NVM_IOTYPE_GC)
> +		return;
> +
> +	rrpc_unlock_rq(rrpc, rrqd);
> +	mempool_free(rrqd, rrpc->rrq_pool);
>  }
>  
>  static void rrpc_end_io(struct nvm_rq *rqd)
>  {
>  	struct rrpc *rrpc = container_of(rqd->ins, struct rrpc, instance);
> -	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> -	uint8_t npages = rqd->nr_pages;
> -	sector_t laddr = rrpc_get_laddr(rqd->bio) - npages;
> +	uint8_t nr_pages = rqd->nr_pages;
>  
>  	if (bio_data_dir(rqd->bio) == WRITE)
> -		rrpc_end_io_write(rrpc, rrqd, laddr, npages);
> +		rrpc_end_io_write(rrpc, rqd, nr_pages);
> +	else
> +		rrpc_end_io_read(rrpc, rqd, nr_pages);
>  
>  	bio_put(rqd->bio);
>  
> -	if (rrqd->flags & NVM_IOTYPE_GC)
> -		return;
> -
> -	rrpc_unlock_rq(rrpc, rqd);
> -
> -	if (npages > 1)
> +	if (nr_pages > 1)

Cleanup patches can go in afterwards or before.

>  		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
>  	if (rqd->metadata)
>  		nvm_dev_dma_free(rrpc->dev, rqd->metadata, rqd->dma_metadata);
> @@ -708,20 +843,24 @@ static void rrpc_end_io(struct nvm_rq *rqd)
>  }
>  
>  static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
> -			struct nvm_rq *rqd, unsigned long flags, int npages)
> +			struct nvm_rq *rqd, struct rrpc_buf_rq *brrqd,
> +			unsigned long flags, int nr_pages)
>  {
> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> +	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
>  	struct rrpc_addr *gp;
>  	sector_t laddr = rrpc_get_laddr(bio);
>  	int is_gc = flags & NVM_IOTYPE_GC;
>  	int i;
>  
> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
>  		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> +		mempool_free(rrqd, rrpc->rrq_pool);
> +		mempool_free(rqd, rrpc->rq_pool);

Seems like there is a relationship here between rrq_pool (rrpc_rq) and
rq_pool (nvm_rq)

To get to a single mempool alloc ( and just keep the nvm_rq mempool )



>  		return NVM_IO_REQUEUE;
>  	}
>  
> -	for (i = 0; i < npages; i++) {
> +	for (i = 0; i < nr_pages; i++) {
>  		/* We assume that mapping occurs at 4KB granularity */
>  		BUG_ON(!(laddr + i >= 0 && laddr + i < rrpc->nr_sects));
>  		gp = &rrpc->trans_map[laddr + i];
> @@ -734,8 +873,12 @@ static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
>  			rrpc_unlock_laddr(rrpc, r);
>  			nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
>  							rqd->dma_ppa_list);
> +			mempool_free(rrqd, rrpc->rrq_pool);
> +			mempool_free(rqd, rrpc->rq_pool);
>  			return NVM_IO_DONE;
>  		}
> +
> +		brrqd[i].addr = gp;
>  	}
>  
>  	rqd->opcode = NVM_OP_HBREAD;
> @@ -751,8 +894,11 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
>  	sector_t laddr = rrpc_get_laddr(bio);
>  	struct rrpc_addr *gp;
>  
> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> +		mempool_free(rrqd, rrpc->rrq_pool);
> +		mempool_free(rqd, rrpc->rq_pool);
>  		return NVM_IO_REQUEUE;
> +	}
>  
>  	BUG_ON(!(laddr >= 0 && laddr < rrpc->nr_sects));
>  	gp = &rrpc->trans_map[laddr];
> @@ -761,7 +907,9 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
>  		rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, gp->addr);
>  	} else {
>  		BUG_ON(is_gc);
> -		rrpc_unlock_rq(rrpc, rqd);
> +		rrpc_unlock_rq(rrpc, rrqd);
> +		mempool_free(rrqd, rrpc->rrq_pool);
> +		mempool_free(rqd, rrpc->rq_pool);
>  		return NVM_IO_DONE;
>  	}
>  
> @@ -771,120 +919,190 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
>  	return NVM_IO_OK;
>  }
>  
> +/*
> + * Copy data from current bio to block write buffer. This if necessary
> + * to guarantee durability if a flash block becomes bad before all pages
> + * are written. This buffer is also used to write at the right page
> + * granurality
> + */
> +static int rrpc_write_to_buffer(struct rrpc *rrpc, struct bio *bio,
> +				struct rrpc_rq *rrqd, struct rrpc_addr *addr,
> +				struct rrpc_w_buf *w_buf,
> +				unsigned long flags)
> +{
> +	struct nvm_dev *dev = rrpc->dev;
> +	unsigned int bio_len = bio_cur_bytes(bio);
> +
> +	if (bio_len != RRPC_EXPOSED_PAGE_SIZE)
> +		return NVM_IO_ERR;
> +
> +	spin_lock(&w_buf->w_lock);
> +
> +	WARN_ON(w_buf->cur_mem == w_buf->nentries);
> +
> +	w_buf->mem->rrqd = rrqd;
> +	w_buf->mem->addr = addr;
> +	w_buf->mem->flags = flags;
> +
> +	memcpy(w_buf->mem->data, bio_data(bio), bio_len);
> +
> +	w_buf->cur_mem++;
> +	if (likely(w_buf->cur_mem < w_buf->nentries)) {
> +		w_buf->mem++;
> +		w_buf->mem->data =
> +				w_buf->data + (w_buf->cur_mem * dev->sec_size);
> +	}
> +
> +	spin_unlock(&w_buf->w_lock);
> +
> +	return 0;
> +}
> +
>  static int rrpc_write_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
> -			struct nvm_rq *rqd, unsigned long flags, int npages)
> +			struct rrpc_rq *rrqd, unsigned long flags, int nr_pages)
>  {
> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
> +	struct rrpc_w_buf *w_buf;
>  	struct rrpc_addr *p;
> +	struct rrpc_lun *rlun;
>  	sector_t laddr = rrpc_get_laddr(bio);
>  	int is_gc = flags & NVM_IOTYPE_GC;
> +	int err;
>  	int i;
>  
> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
> -		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> +		mempool_free(rrqd, rrpc->rrq_pool);
>  		return NVM_IO_REQUEUE;
>  	}
>  
> -	for (i = 0; i < npages; i++) {
> +	for (i = 0; i < nr_pages; i++) {
> +		kref_get(&rrqd->refs);
> +
>  		/* We assume that mapping occurs at 4KB granularity */
>  		p = rrpc_map_page(rrpc, laddr + i, is_gc);
>  		if (!p) {
>  			BUG_ON(is_gc);
>  			rrpc_unlock_laddr(rrpc, r);
> -			nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
> -							rqd->dma_ppa_list);
> +			mempool_free(rrqd, rrpc->rrq_pool);
>  			rrpc_gc_kick(rrpc);
>  			return NVM_IO_REQUEUE;
>  		}
>  
> -		rqd->ppa_list[i] = rrpc_ppa_to_gaddr(rrpc->dev,
> -								p->addr);
> +		w_buf = &p->rblk->w_buf;
> +		rlun = p->rblk->rlun;
> +
> +		rrqd->addr = p;
> +
> +		err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
> +		if (err) {
> +			pr_err("rrpc: could not write to write buffer\n");
> +			return err;
> +		}
> +
> +		bio_advance(bio, RRPC_EXPOSED_PAGE_SIZE);
> +
> +		queue_work(rrpc->kw_wq, &rlun->ws_writer);

How about a timer and only the kick when a flush command is sent?

>  	}
>  
> -	rqd->opcode = NVM_OP_HBWRITE;
> +	if (kref_put(&rrqd->refs, rrpc_release_and_free_rrqd)) {
> +		pr_err("rrpc: request reference counter dailed\n");
> +		return NVM_IO_ERR;
> +	}
>  
> -	return NVM_IO_OK;
> +	return NVM_IO_DONE;
>  }
>  
>  static int rrpc_write_rq(struct rrpc *rrpc, struct bio *bio,
> -				struct nvm_rq *rqd, unsigned long flags)
> +				struct rrpc_rq *rrqd, unsigned long flags)
>  {
> -	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> +	struct rrpc_w_buf *w_buf;
>  	struct rrpc_addr *p;
> +	struct rrpc_lun *rlun;
>  	int is_gc = flags & NVM_IOTYPE_GC;
> +	int err;
>  	sector_t laddr = rrpc_get_laddr(bio);
>  
> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> +		mempool_free(rrqd, rrpc->rrq_pool);
>  		return NVM_IO_REQUEUE;
> +	}
>  
>  	p = rrpc_map_page(rrpc, laddr, is_gc);
>  	if (!p) {
>  		BUG_ON(is_gc);
> -		rrpc_unlock_rq(rrpc, rqd);
> +		rrpc_unlock_rq(rrpc, rrqd);
> +		mempool_free(rrqd, rrpc->rrq_pool);
>  		rrpc_gc_kick(rrpc);
>  		return NVM_IO_REQUEUE;
>  	}
>  
> -	rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, p->addr);
> -	rqd->opcode = NVM_OP_HBWRITE;
> +	w_buf = &p->rblk->w_buf;
> +	rlun = p->rblk->rlun;
> +
>  	rrqd->addr = p;
>  
> -	return NVM_IO_OK;
> +	err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
> +	if (err) {
> +		pr_err("rrpc: could not write to write buffer\n");
> +		return err;
> +	}
> +
> +	queue_work(rrpc->kw_wq, &rlun->ws_writer);
> +	return NVM_IO_DONE;
>  }
>  
> -static int rrpc_setup_rq(struct rrpc *rrpc, struct bio *bio,
> -			struct nvm_rq *rqd, unsigned long flags, uint8_t npages)
> +static int rrpc_submit_read(struct rrpc *rrpc, struct bio *bio,
> +				struct rrpc_rq *rrqd, unsigned long flags)
>  {
> -	if (npages > 1) {
> +	struct nvm_rq *rqd;
> +	struct rrpc_buf_rq brrqd[rrpc->max_write_pgs];
> +	uint8_t nr_pages = rrpc_get_pages(bio);
> +	int err;
> +
> +	rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
> +	if (!rqd) {
> +		pr_err_ratelimited("rrpc: not able to queue bio.");
> +		bio_io_error(bio);
> +		return NVM_IO_ERR;
> +	}
> +	rqd->metadata = NULL;
> +	rqd->priv = rrqd;
> +
> +	if (nr_pages > 1) {
>  		rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_KERNEL,
> -							&rqd->dma_ppa_list);
> +						&rqd->dma_ppa_list);
>  		if (!rqd->ppa_list) {
>  			pr_err("rrpc: not able to allocate ppa list\n");
> +			mempool_free(rrqd, rrpc->rrq_pool);
> +			mempool_free(rqd, rrpc->rq_pool);
>  			return NVM_IO_ERR;
>  		}
>  
> -		if (bio_rw(bio) == WRITE)
> -			return rrpc_write_ppalist_rq(rrpc, bio, rqd, flags,
> -									npages);
> -
> -		return rrpc_read_ppalist_rq(rrpc, bio, rqd, flags, npages);
> +		err = rrpc_read_ppalist_rq(rrpc, bio, rqd, brrqd, flags,
> +								nr_pages);
> +		if (err) {
> +			mempool_free(rrqd, rrpc->rrq_pool);
> +			mempool_free(rqd, rrpc->rq_pool);
> +			return err;
> +		}
> +	} else {
> +		err = rrpc_read_rq(rrpc, bio, rqd, flags);
> +		if (err)
> +			return err;
>  	}
>  
> -	if (bio_rw(bio) == WRITE)
> -		return rrpc_write_rq(rrpc, bio, rqd, flags);
> -
> -	return rrpc_read_rq(rrpc, bio, rqd, flags);
> -}
> -
> -static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> -				struct nvm_rq *rqd, unsigned long flags)
> -{
> -	int err;
> -	struct rrpc_rq *rrq = nvm_rq_to_pdu(rqd);
> -	uint8_t nr_pages = rrpc_get_pages(bio);
> -	int bio_size = bio_sectors(bio) << 9;
> -
> -	if (bio_size < rrpc->dev->sec_size)
> -		return NVM_IO_ERR;
> -	else if (bio_size > rrpc->dev->max_rq_size)
> -		return NVM_IO_ERR;
> -
> -	err = rrpc_setup_rq(rrpc, bio, rqd, flags, nr_pages);
> -	if (err)
> -		return err;
> -
>  	bio_get(bio);
>  	rqd->bio = bio;
>  	rqd->ins = &rrpc->instance;
> -	rqd->nr_pages = nr_pages;
> -	rrq->flags = flags;
> +	rqd->nr_pages = rrqd->nr_pages = nr_pages;
> +	rqd->flags = flags;
>  
>  	err = nvm_submit_io(rrpc->dev, rqd);
>  	if (err) {
>  		pr_err("rrpc: I/O submission failed: %d\n", err);
>  		bio_put(bio);
>  		if (!(flags & NVM_IOTYPE_GC)) {
> -			rrpc_unlock_rq(rrpc, rqd);
> +			rrpc_unlock_rq(rrpc, rrqd);
>  			if (rqd->nr_pages > 1)
>  				nvm_dev_dma_free(rrpc->dev,
>  			rqd->ppa_list, rqd->dma_ppa_list);
> @@ -895,10 +1113,39 @@ static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
>  	return NVM_IO_OK;
>  }
>  
> +static int rrpc_buffer_write(struct rrpc *rrpc, struct bio *bio,
> +				struct rrpc_rq *rrqd, unsigned long flags)
> +{
> +	uint8_t nr_pages = rrpc_get_pages(bio);
> +
> +	rrqd->nr_pages = nr_pages;
> +
> +	if (nr_pages > 1)
> +		return rrpc_write_ppalist_rq(rrpc, bio, rrqd, flags, nr_pages);
> +	else

You can skip the else here.

> +		return rrpc_write_rq(rrpc, bio, rrqd, flags);
> +}
> +
> +static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> +				struct rrpc_rq *rrqd, unsigned long flags)
> +{
> +	int bio_size = bio_sectors(bio) << 9;
> +
> +	if (bio_size < rrpc->dev->sec_size)
> +		return NVM_IO_ERR;
> +	else if (bio_size > rrpc->dev->max_rq_size)
> +		return NVM_IO_ERR;

I have a patch incoming that removes this. Properly making
rrpc_submit_io obsolete.

> +
> +	if (bio_rw(bio) == READ)
> +		return rrpc_submit_read(rrpc, bio, rrqd, flags);
> +
> +	return rrpc_buffer_write(rrpc, bio, rrqd, flags);
> +}
> +
>  static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
>  {
>  	struct rrpc *rrpc = q->queuedata;
> -	struct nvm_rq *rqd;
> +	struct rrpc_rq *rrqd;
>  	int err;
>  
>  	if (bio->bi_rw & REQ_DISCARD) {
> @@ -906,15 +1153,15 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
>  		return BLK_QC_T_NONE;
>  	}
>  
> -	rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
> -	if (!rqd) {
> -		pr_err_ratelimited("rrpc: not able to queue bio.");
> +	rrqd = mempool_alloc(rrpc->rrq_pool, GFP_KERNEL);
> +	if (!rrqd) {
> +		pr_err_ratelimited("rrpc: not able to allocate rrqd.");
>  		bio_io_error(bio);
>  		return BLK_QC_T_NONE;
>  	}
> -	memset(rqd, 0, sizeof(struct nvm_rq));

It seems like we don't use nvm_rq in the write patch? (the writer thread
will do the write itself). Move this inside read? and use the original
nvm_rq for submission?

> +	kref_init(&rrqd->refs);
>  
> -	err = rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_NONE);
> +	err = rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_NONE);
>  	switch (err) {
>  	case NVM_IO_OK:
>  		return BLK_QC_T_NONE;
> @@ -932,10 +1179,221 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
>  		break;
>  	}
>  
> -	mempool_free(rqd, rrpc->rq_pool);
>  	return BLK_QC_T_NONE;
>  }
>  
> +static int rrpc_alloc_page_in_bio(struct rrpc *rrpc, struct bio *bio,
> +								void *data)
> +{
> +	struct page *page;
> +	int err;
> +
> +	if (PAGE_SIZE != RRPC_EXPOSED_PAGE_SIZE)
> +		return -1;

return -EINVAL?

This doesn't work on power and sparch architectures (with larger
PAGE_SIZEs). Would be great to handle that as well.

> +
> +	page = virt_to_page(data);
> +	if (!page) {
> +		pr_err("nvm: rrpc: could not alloc page\n");
> +		return -1;
> +	}
> +
> +	err = bio_add_page(bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
> +	if (err != RRPC_EXPOSED_PAGE_SIZE) {
> +		pr_err("nvm: rrpc: could not add page to bio\n");
> +		return -1;
> +	}
> +
> +	return 0;
> +}
> +
> +static void rrpc_submit_write(struct work_struct *work)
> +{
> +	struct rrpc_lun *rlun = container_of(work, struct rrpc_lun, ws_writer);
> +	struct rrpc *rrpc = rlun->rrpc;
> +	struct nvm_dev *dev = rrpc->dev;
> +	struct rrpc_addr *addr;
> +	struct rrpc_rq *rrqd;
> +	struct rrpc_buf_rq *brrqd;
> +	void *data;
> +	struct nvm_rq *rqd;
> +	struct rrpc_block *rblk;
> +	struct bio *bio;
> +	int pgs_to_sync, pgs_avail;
> +	int sync = NVM_SYNC_HARD;
> +	int err;
> +	int i;
> +
> +	/* Note that OS pages are typically mapped to flash page sectors, which
> +	 * are 4K; a flash page might be formed of several sectors. Also,
> +	 * controllers typically program flash pages across multiple planes.
> +	 * This is the flash programing granurality, and the reason behind the
> +	 * sync strategy performed in this write thread.
> +	 */
> +try:
> +	spin_lock(&rlun->parent->lock);
> +	list_for_each_entry(rblk, &rlun->open_list, list) {
> +		if (!spin_trylock(&rblk->w_buf.w_lock))
> +			continue;
> +
> +		/* If the write thread has already submitted all I/Os in the
> +		 * write buffer for this block ignore that the block is in the
> +		 * open list; it is on its way to the closed list. This enables
> +		 * us to avoid taking a lock on the list.
> +		 */
> +		if (unlikely(rblk->w_buf.cur_subm == rblk->w_buf.nentries)) {
> +			spin_unlock(&rblk->w_buf.w_lock);
> +			spin_unlock(&rlun->parent->lock);
> +			schedule();
> +			goto try;
> +		}
> +		pgs_avail = rblk->w_buf.cur_mem - rblk->w_buf.cur_subm;
> +
> +		switch (sync) {
> +		case NVM_SYNC_SOFT:
> +			pgs_to_sync = (pgs_avail >= rrpc->max_write_pgs) ?
> +					rrpc->max_write_pgs : 0;
> +			break;
> +		case NVM_SYNC_HARD:
> +			if (pgs_avail >= rrpc->max_write_pgs)
> +				pgs_to_sync = rrpc->max_write_pgs;
> +			else if (pgs_avail >= rrpc->min_write_pgs)
> +				pgs_to_sync = rrpc->min_write_pgs *
> +					(pgs_avail / rrpc->min_write_pgs);
> +			else
> +				pgs_to_sync = pgs_avail; /* TODO: ADD PADDING */
> +			break;
> +		case NVM_SYNC_OPORT:
> +			if (pgs_avail >= rrpc->max_write_pgs)
> +				pgs_to_sync = rrpc->max_write_pgs;
> +			else if (pgs_avail >= rrpc->min_write_pgs)
> +				pgs_to_sync = rrpc->min_write_pgs *
> +					(pgs_avail / rrpc->min_write_pgs);
> +			else
> +				pgs_to_sync = 0;
> +		}
> +
> +		if (pgs_to_sync == 0) {
> +			spin_unlock(&rblk->w_buf.w_lock);
> +			continue;
> +		}
> +
> +		bio = bio_alloc(GFP_ATOMIC, pgs_to_sync);
> +		if (!bio) {
> +			pr_err("nvm: rrpc: could not alloc write bio\n");
> +			goto out1;
> +		}
> +
> +		rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
> +		if (!rqd) {
> +			pr_err_ratelimited("rrpc: not able to create w req.");
> +			goto out2;
> +		}
> +		rqd->metadata = NULL;
> +
> +		brrqd = mempool_alloc(rrpc->m_rrq_pool, GFP_ATOMIC);
> +		if (!brrqd) {
> +			pr_err_ratelimited("rrpc: not able to create w rea.");
> +			goto out3;
> +		}
> +
> +		bio->bi_iter.bi_sector = 0; /* artificial bio */
> +		bio->bi_rw = WRITE;
> +
> +		rqd->opcode = NVM_OP_HBWRITE;
> +		rqd->bio = bio;
> +		rqd->ins = &rrpc->instance;
> +		rqd->nr_pages = pgs_to_sync;
> +		rqd->priv = brrqd;
> +
> +		if (pgs_to_sync == 1) {
> +			rrqd = rblk->w_buf.subm->rrqd;
> +			addr = rblk->w_buf.subm->addr;
> +			rqd->flags = rblk->w_buf.subm->flags;
> +			data = rblk->w_buf.subm->data;
> +
> +			err = rrpc_alloc_page_in_bio(rrpc, bio, data);
> +			if (err) {
> +				pr_err("rrpc: cannot allocate page in bio\n");
> +				goto out4;
> +			}
> +
> +			/* TODO: This address can be skipped */
> +			if (addr->addr == ADDR_EMPTY)
> +				pr_err_ratelimited("rrpc: submitting empty rq");
> +
> +			rqd->ppa_addr = rrpc_ppa_to_gaddr(dev, addr->addr);
> +
> +			brrqd[0].rrqd = rrqd;
> +			brrqd[0].addr = addr;
> +
> +			rblk->w_buf.subm++;
> +			rblk->w_buf.cur_subm++;
> +
> +			goto submit_io;
> +		}
> +
> +		/* This bio will contain several pppas */
> +		rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_ATOMIC,
> +							&rqd->dma_ppa_list);
> +		if (!rqd->ppa_list) {
> +			pr_err("rrpc: not able to allocate ppa list\n");
> +			goto out4;
> +		}
> +
> +		for (i = 0; i < pgs_to_sync; i++) {
> +			rrqd = rblk->w_buf.subm->rrqd;
> +			addr = rblk->w_buf.subm->addr;
> +			rqd->flags = rblk->w_buf.subm->flags;
> +			data = rblk->w_buf.subm->data;
> +
> +			err = rrpc_alloc_page_in_bio(rrpc, bio, data);
> +			if (err) {
> +				pr_err("rrpc: cannot allocate page in bio\n");
> +				goto out5;
> +			}
> +
> +			/* TODO: This address can be skipped */
> +			if (addr->addr == ADDR_EMPTY)
> +				pr_err_ratelimited("rrpc: submitting empty rq");
> +
> +			rqd->ppa_list[i] = rrpc_ppa_to_gaddr(dev, addr->addr);
> +
> +			brrqd[i].rrqd = rrqd;
> +			brrqd[i].addr = addr;
> +
> +			rblk->w_buf.subm++;
> +			rblk->w_buf.cur_subm++;
> +		}
> +
> +submit_io:
> +		WARN_ON(rblk->w_buf.cur_subm > rblk->w_buf.nentries);
> +
> +		spin_unlock(&rblk->w_buf.w_lock);
> +
> +		err = nvm_submit_io(dev, rqd);
> +		if (err) {
> +			pr_err("rrpc: I/O submission failed: %d\n", err);
> +			mempool_free(rqd, rrpc->rq_pool);
> +			bio_put(bio);
> +		}
> +	}
> +
> +	spin_unlock(&rlun->parent->lock);
> +	return;
> +
> +out5:
> +	nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> +out4:
> +	mempool_free(brrqd, rrpc->m_rrq_pool);
> +out3:
> +	mempool_free(rqd, rrpc->rq_pool);
> +out2:
> +	bio_put(bio);
> +out1:
> +	spin_unlock(&rblk->w_buf.w_lock);
> +	spin_unlock(&rlun->parent->lock);
> +}
> +

The function can at least be split up in two parts. The what to sync and
the actual write. The ret* labels should be replaced with more sane names.

>  static void rrpc_requeue(struct work_struct *work)
>  {
>  	struct rrpc *rrpc = container_of(work, struct rrpc, ws_requeue);
> @@ -978,7 +1436,7 @@ static void rrpc_gc_free(struct rrpc *rrpc)
>  
>  static int rrpc_gc_init(struct rrpc *rrpc)
>  {
> -	rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM|WQ_UNBOUND,
> +	rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM | WQ_UNBOUND,

No need for reformatting

>  								rrpc->nr_luns);
>  	if (!rrpc->krqd_wq)
>  		return -ENOMEM;
> @@ -1080,6 +1538,8 @@ static int rrpc_map_init(struct rrpc *rrpc)
>  
>  static int rrpc_core_init(struct rrpc *rrpc)
>  {
> +	struct nvm_dev *dev = rrpc->dev;
> +
>  	down_write(&rrpc_lock);
>  	if (!rrpc_gcb_cache) {
>  		rrpc_gcb_cache = kmem_cache_create("rrpc_gcb",
> @@ -1089,14 +1549,70 @@ static int rrpc_core_init(struct rrpc *rrpc)
>  			return -ENOMEM;
>  		}
>  
> -		rrpc_rq_cache = kmem_cache_create("rrpc_rq",
> -				sizeof(struct nvm_rq) + sizeof(struct rrpc_rq),
> -				0, 0, NULL);
> +		rrpc_rq_cache = kmem_cache_create("nvm_rq",
> +					sizeof(struct nvm_rq), 0, 0, NULL);
>  		if (!rrpc_rq_cache) {
>  			kmem_cache_destroy(rrpc_gcb_cache);
>  			up_write(&rrpc_lock);
>  			return -ENOMEM;
>  		}
> +
> +		rrpc_rrq_cache = kmem_cache_create("rrpc_rrq",
> +					sizeof(struct rrpc_rq), 0, 0, NULL);
> +		if (!rrpc_rrq_cache) {
> +			kmem_cache_destroy(rrpc_gcb_cache);
> +			kmem_cache_destroy(rrpc_rq_cache);
> +			up_write(&rrpc_lock);
> +			return -ENOMEM;
> +		}
> +
> +		rrpc_buf_rrq_cache = kmem_cache_create("rrpc_m_rrq",
> +			rrpc->max_write_pgs * sizeof(struct rrpc_buf_rq),
> +			0, 0, NULL);
> +		if (!rrpc_buf_rrq_cache) {
> +			kmem_cache_destroy(rrpc_gcb_cache);
> +			kmem_cache_destroy(rrpc_rq_cache);
> +			kmem_cache_destroy(rrpc_rrq_cache);
> +			up_write(&rrpc_lock);
> +			return -ENOMEM;
> +		}
> +	}
> +
> +	/* we assume that sec->sec_size is the same as the page size exposed by
> +	 * rrpc (4KB). We need extra logic otherwise
> +	 */
> +	if (!rrpc_block_cache) {
> +		/* Write buffer: Allocate all buffer (for all block) at once. We
> +		 * avoid having to allocate a memory from the pool for each IO
> +		 * at the cost pre-allocating memory for the whole block when a
> +		 * new block is allocated from the media manager.
> +		 */
> +		rrpc_wb_cache = kmem_cache_create("nvm_wb",
> +			dev->pgs_per_blk * dev->sec_per_pg * dev->sec_size,
> +			0, 0, NULL);

Seems a bit excessive to allocate a mempool for this. Why not do a
vmalloc for when the memory is needed? The normal case is 4MB? per
entry, and the cache creates 8 of them in the pool, using 64MB out of
the gate. We are not in a hot path at this point in time, so lets not
waste the resource when we might not use them.

> +		if (!rrpc_wb_cache) {
> +			kmem_cache_destroy(rrpc_gcb_cache);
> +			kmem_cache_destroy(rrpc_rq_cache);
> +			kmem_cache_destroy(rrpc_rrq_cache);
> +			kmem_cache_destroy(rrpc_buf_rrq_cache);
> +			up_write(&rrpc_lock);
> +			return -ENOMEM;
> +		}
> +
> +		/* Write buffer entries */
> +		rrpc_block_cache = kmem_cache_create("nvm_entry",
> +			dev->pgs_per_blk * dev->sec_per_pg *
> +			sizeof(struct buf_entry),
> +			0, 0, NULL);
> +		if (!rrpc_block_cache) {
> +			kmem_cache_destroy(rrpc_gcb_cache);
> +			kmem_cache_destroy(rrpc_rq_cache);
> +			kmem_cache_destroy(rrpc_rrq_cache);
> +			kmem_cache_destroy(rrpc_buf_rrq_cache);
> +			kmem_cache_destroy(rrpc_wb_cache);
> +			up_write(&rrpc_lock);
> +			return -ENOMEM;
> +		}
>  	}
>  	up_write(&rrpc_lock);
>  
> @@ -1113,17 +1629,45 @@ static int rrpc_core_init(struct rrpc *rrpc)
>  	if (!rrpc->rq_pool)
>  		return -ENOMEM;
>  
> +	rrpc->rrq_pool = mempool_create_slab_pool(64, rrpc_rrq_cache);
> +	if (!rrpc->rrq_pool)
> +		return -ENOMEM;
> +
> +	rrpc->m_rrq_pool = mempool_create_slab_pool(64, rrpc_buf_rrq_cache);
> +	if (!rrpc->m_rrq_pool)
> +		return -ENOMEM;
> +
> +	rrpc->block_pool = mempool_create_slab_pool(8, rrpc_block_cache);
> +	if (!rrpc->block_pool)
> +		return -ENOMEM;
> +
> +	rrpc->write_buf_pool = mempool_create_slab_pool(8, rrpc_wb_cache);
> +	if (!rrpc->write_buf_pool)
> +		return -ENOMEM;

Alot of pools. I hope the nvm_rq, rrpc_rq, and rrpc_buf_rq consolidation
can move it back to just a nvm_rq entry)

> +
>  	spin_lock_init(&rrpc->inflights.lock);
>  	INIT_LIST_HEAD(&rrpc->inflights.reqs);
>  
> +	rrpc->kw_wq = alloc_workqueue("rrpc-writer",
> +				WQ_MEM_RECLAIM | WQ_UNBOUND, rrpc->nr_luns);
> +	if (!rrpc->kw_wq)
> +		return -ENOMEM;
> +
>  	return 0;
>  }
>  
>  static void rrpc_core_free(struct rrpc *rrpc)
>  {
> +	if (rrpc->kw_wq)
> +		destroy_workqueue(rrpc->kw_wq);
> +
>  	mempool_destroy(rrpc->page_pool);
>  	mempool_destroy(rrpc->gcb_pool);
> +	mempool_destroy(rrpc->m_rrq_pool);
> +	mempool_destroy(rrpc->rrq_pool);
>  	mempool_destroy(rrpc->rq_pool);
> +	mempool_destroy(rrpc->block_pool);
> +	mempool_destroy(rrpc->write_buf_pool);
>  }
>  
>  static void rrpc_luns_free(struct rrpc *rrpc)
> @@ -1164,6 +1708,8 @@ static int rrpc_luns_init(struct rrpc *rrpc, int lun_begin, int lun_end)
>  		INIT_LIST_HEAD(&rlun->open_list);
>  		INIT_LIST_HEAD(&rlun->closed_list);
>  
> +		INIT_WORK(&rlun->ws_writer, rrpc_submit_write);
> +
>  		INIT_WORK(&rlun->ws_gc, rrpc_lun_gc);
>  		spin_lock_init(&rlun->lock);
>  
> @@ -1209,6 +1755,7 @@ static void rrpc_exit(void *private)
>  
>  	flush_workqueue(rrpc->krqd_wq);
>  	flush_workqueue(rrpc->kgc_wq);
> +	/* flush_workqueue(rrpc->kw_wq); */ /* TODO: Implement flush + padding*/

This can come in an additional patch.

>  
>  	rrpc_free(rrpc);
>  }
> diff --git a/drivers/lightnvm/rrpc.h b/drivers/lightnvm/rrpc.h
> index 868e91a..6e188b4 100644
> --- a/drivers/lightnvm/rrpc.h
> +++ b/drivers/lightnvm/rrpc.h
> @@ -49,17 +49,67 @@ struct rrpc_inflight_rq {
>  struct rrpc_rq {
>  	struct rrpc_inflight_rq inflight_rq;
>  	struct rrpc_addr *addr;
> +	int nr_pages;
> +
> +	struct kref refs;

I think what you really want to do here is to add the ref counter to
rrpc_inflight_addr and then have that maintain it to when to free.
(also, name it kref)

> +};
> +
> +struct rrpc_buf_rq {
> +	struct rrpc_addr *addr;
> +	struct rrpc_rq *rrqd;

I'm not sure why we keep a rrpc_buf_rq->addr and rrpc_rq->addr?

> +};
> +
> +/* Sync strategies from write buffer to media */
> +enum {
> +	NVM_SYNC_SOFT	= 0x0,		/* Only submit at max_write_pgs
> +					 * supported by the device. Typically 64
> +					 * pages (256k).
> +					 */
> +	NVM_SYNC_HARD	= 0x1,		/* Submit the whole buffer. Add padding
> +					 * if necessary to respect the device's
> +					 * min_write_pgs.
> +					 */
> +	NVM_SYNC_OPORT	= 0x2,		/* Submit what we can, always respecting
> +					 * the device's min_write_pgs.
> +					 */

That is great. This should properly be exposed as a sysfs option later.
> +};
> +
> +struct buf_entry {
> +	struct rrpc_rq *rrqd;
> +	void *data;
> +	struct rrpc_addr *addr;
>  	unsigned long flags;
>  };
>  
> +struct rrpc_w_buf {
> +	struct buf_entry *entries;	/* Entries */
> +	struct buf_entry *mem;		/* Points to the next writable entry */
> +	struct buf_entry *subm;		/* Points to the last submitted entry */
> +	int cur_mem;			/* Current memory entry. Follows mem */
> +	int cur_subm;			/* Entries have been submitted to dev */
> +	int nentries;			/* Number of entries in write buffer */

nr_entries?

> +
> +	void *data;			/* Actual data */
> +	unsigned long *sync_bitmap;	/* Bitmap representing physical
> +					 * addresses that have been synced to
> +					 * the media
> +					 */
> +
> +	atomic_t refs;

kref? semaphore?

> +
> +	spinlock_t w_lock;
> +	spinlock_t s_lock;

is it short for submission lock?

> +};
> +
>  struct rrpc_block {
>  	struct nvm_block *parent;
>  	struct rrpc_lun *rlun;
>  	struct list_head prio;
>  	struct list_head list;
> +	struct rrpc_w_buf w_buf;
>  
>  #define MAX_INVALID_PAGES_STORAGE 8
> -	/* Bitmap for invalid page intries */
> +	/* Bitmap for invalid page entries */
>  	unsigned long invalid_pages[MAX_INVALID_PAGES_STORAGE];
>  	/* points to the next writable page within a block */
>  	unsigned int next_page;
> @@ -67,7 +117,6 @@ struct rrpc_block {
>  	unsigned int nr_invalid_pages;
>  
>  	spinlock_t lock;
> -	atomic_t data_cmnt_size; /* data pages committed to stable storage */
>  };
>  
>  struct rrpc_lun {
> @@ -86,6 +135,7 @@ struct rrpc_lun {
>  					 */
>  
>  	struct work_struct ws_gc;
> +	struct work_struct ws_writer;
>  
>  	spinlock_t lock;
>  };
> @@ -136,10 +186,15 @@ struct rrpc {
>  	mempool_t *page_pool;
>  	mempool_t *gcb_pool;
>  	mempool_t *rq_pool;
> +	mempool_t *rrq_pool;
> +	mempool_t *m_rrq_pool;
> +	mempool_t *block_pool;
> +	mempool_t *write_buf_pool;
>  
>  	struct timer_list gc_timer;
>  	struct workqueue_struct *krqd_wq;
>  	struct workqueue_struct *kgc_wq;
> +	struct workqueue_struct *kw_wq;
>  };
>  
>  struct rrpc_block_gc {
> @@ -181,7 +236,7 @@ static inline int request_intersects(struct rrpc_inflight_rq *r,
>  }
>  
>  static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
> -			     unsigned pages, struct rrpc_inflight_rq *r)
> +				unsigned pages, struct rrpc_inflight_rq *r)
>  {
>  	sector_t laddr_end = laddr + pages - 1;
>  	struct rrpc_inflight_rq *rtmp;
> @@ -206,27 +261,26 @@ static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
>  }
>  
>  static inline int rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
> -				 unsigned pages,
> -				 struct rrpc_inflight_rq *r)
> +				unsigned pages,
> +				struct rrpc_inflight_rq *r)
>  {
>  	BUG_ON((laddr + pages) > rrpc->nr_sects);
>  
>  	return __rrpc_lock_laddr(rrpc, laddr, pages, r);
>  }
>  
> -static inline struct rrpc_inflight_rq *rrpc_get_inflight_rq(struct nvm_rq *rqd)
> +static inline struct rrpc_inflight_rq
> +				*rrpc_get_inflight_rq(struct rrpc_rq *rrqd)
>  {
> -	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> -
>  	return &rrqd->inflight_rq;
>  }
>  
>  static inline int rrpc_lock_rq(struct rrpc *rrpc, struct bio *bio,
> -							struct nvm_rq *rqd)
> +							struct rrpc_rq *rrqd)
>  {
>  	sector_t laddr = rrpc_get_laddr(bio);
>  	unsigned int pages = rrpc_get_pages(bio);
> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
>  
>  	return rrpc_lock_laddr(rrpc, laddr, pages, r);
>  }
> @@ -241,12 +295,12 @@ static inline void rrpc_unlock_laddr(struct rrpc *rrpc,
>  	spin_unlock_irqrestore(&rrpc->inflights.lock, flags);
>  }
>  
> -static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct nvm_rq *rqd)
> +static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct rrpc_rq *rrqd)
>  {
> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> -	uint8_t pages = rqd->nr_pages;
> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
> +	uint8_t nr_pages = rrqd->nr_pages;

The cleanups can be moved to another patch.

>  
> -	BUG_ON((r->l_start + pages) > rrpc->nr_sects);
> +	BUG_ON((r->l_start + nr_pages) > rrpc->nr_sects);
>  
>  	rrpc_unlock_laddr(rrpc, r);
>  }
> diff --git a/include/linux/lightnvm.h b/include/linux/lightnvm.h
> index b94f2d5..eda9743 100644
> --- a/include/linux/lightnvm.h
> +++ b/include/linux/lightnvm.h
> @@ -231,6 +231,8 @@ struct nvm_rq {
>  	void *metadata;
>  	dma_addr_t dma_metadata;
>  
> +	void *priv;
> +
>  	struct completion *wait;
>  	nvm_end_io_fn *end_io;
>  
> @@ -243,12 +245,12 @@ struct nvm_rq {
>  
>  static inline struct nvm_rq *nvm_rq_from_pdu(void *pdu)
>  {
> -	return pdu - sizeof(struct nvm_rq);
> +	return container_of(pdu, struct nvm_rq, priv);
>  }
>  
>  static inline void *nvm_rq_to_pdu(struct nvm_rq *rqdata)
>  {
> -	return rqdata + 1;
> +	return rqdata->priv;
>  }

I think the priv is unnecessary. This should be able to be expressed in
a single structure. E.g. have the rrpc_rq be a union of the data types
you either need when processing a read or a write. It seems that they no
longer have much in common.


--
To unsubscribe from this list: send the line "unsubscribe linux-block" in
the body of a message to majordomo@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
=?UTF-8?q?Javier=20Gonz=C3=A1lez?= Feb. 8, 2016, 7:31 a.m. UTC | #2
Hi Matias,

Thanks for the quick reply and revision. I’ll add the changes and merge
the other patches for the next version. I have a couple of remarks
inline.


> On 05 Feb 2016, at 15:52, Matias Bjørling <mb@lightnvm.io> wrote:
> 
> On 02/04/2016 02:08 PM, Javier González wrote:
>> Flash controllers typically define flash pages as a collection of flash
>> sectors of typically 4K. Moreover, flash controllers might program flash
>> pages across several planes. This defines the write granurality at which
>> flash can be programmed. This is different for each flash controller. In
>> rrpc, the assumption has been that flash pages are 4K, thus writes could
>> be sent out to the media as they were received.
>> 
>> This patch adds a per-block write buffer to rrpc. Writes are flushed to
>> the media in chuncks of the minimum granurality allowed by the
>> controller, though the strategy can be changed to, for example, only
>> flush at the maximum supported (64 pages in nvme). Apart from enabling
>> the use of rrpc in real hardware, this buffering strategy will be used
>> for recovery; if a block becomes bad, a new block can be allocated to
>> persist valid data.
> 
> Great work. The code is nearly 2x'ed in size. Can you add a detailed
> explanation on the inner workings of the write buffering to help new
> users to understand its logic?

Thanks. Sure, I’ll add a description of the internals in the next version.

> 
>> Signed-off-by: Javier González <javier@cnexlabs.com>
>> ---
>> drivers/lightnvm/rrpc.c  | 851 ++++++++++++++++++++++++++++++++++++++---------
>> drivers/lightnvm/rrpc.h  |  82 ++++-
>> include/linux/lightnvm.h |   6 +-
>> 3 files changed, 771 insertions(+), 168 deletions(-)
>> 
>> diff --git a/drivers/lightnvm/rrpc.c b/drivers/lightnvm/rrpc.c
>> index 8187bf3..e9fb19d 100644
>> --- a/drivers/lightnvm/rrpc.c
>> +++ b/drivers/lightnvm/rrpc.c
>> @@ -16,11 +16,12 @@
>> 
>> #include "rrpc.h"
>> 
>> -static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache;
>> +static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache, *rrpc_rrq_cache,
>> +			*rrpc_buf_rrq_cache, *rrpc_wb_cache, *rrpc_block_cache;
>> static DECLARE_RWSEM(rrpc_lock);
>> 
>> static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
>> -				struct nvm_rq *rqd, unsigned long flags);
>> +				struct rrpc_rq *rqd, unsigned long flags);
>> 
>> #define rrpc_for_each_lun(rrpc, rlun, i) \
>> 		for ((i) = 0, rlun = &(rrpc)->luns[0]; \
>> @@ -62,53 +63,59 @@ static void rrpc_invalidate_range(struct rrpc *rrpc, sector_t slba,
>> 	spin_unlock(&rrpc->rev_lock);
>> }
>> 
>> -static struct nvm_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
>> +static void rrpc_release_and_free_rrqd(struct kref *ref)
>> +{
>> +	struct rrpc_rq *rrqd = container_of(ref, struct rrpc_rq, refs);
>> +	struct rrpc *rrpc = rrqd->addr->rblk->rlun->rrpc;
> 
> Wow.. Happy that we got to rrpc :)

Yes, I know… I'll find another way :)

> 
>> +
>> +	rrpc_unlock_rq(rrpc, rrqd);
>> +	mempool_free(rrqd, rrpc->rrq_pool);
>> +}
>> +
>> +static struct rrpc_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
>> 					sector_t laddr, unsigned int pages)
>> {
>> -	struct nvm_rq *rqd;
>> +	struct rrpc_rq *rrqd;
>> 	struct rrpc_inflight_rq *inf;
>> 
>> -	rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
>> -	if (!rqd)
>> +	rrqd = mempool_alloc(rrpc->rrq_pool, GFP_ATOMIC);
>> +	if (!rrqd)
>> 		return ERR_PTR(-ENOMEM);
>> +	kref_init(&rrqd->refs);
>> 
>> -	inf = rrpc_get_inflight_rq(rqd);
>> +	inf = rrpc_get_inflight_rq(rrqd);
>> 	if (rrpc_lock_laddr(rrpc, laddr, pages, inf)) {
>> -		mempool_free(rqd, rrpc->rq_pool);
>> +		mempool_free(rrqd, rrpc->rrq_pool);
>> 		return NULL;
>> 	}
>> 
>> -	return rqd;
>> +	return rrqd;
>> }
>> 
>> -static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct nvm_rq *rqd)
>> +static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct rrpc_rq *rrqd)
>> {
>> -	struct rrpc_inflight_rq *inf = rrpc_get_inflight_rq(rqd);
>> -
>> -	rrpc_unlock_laddr(rrpc, inf);
>> -
>> -	mempool_free(rqd, rrpc->rq_pool);
>> +	kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
>> }
>> 
>> static void rrpc_discard(struct rrpc *rrpc, struct bio *bio)
>> {
>> 	sector_t slba = bio->bi_iter.bi_sector / NR_PHY_IN_LOG;
>> 	sector_t len = bio->bi_iter.bi_size / RRPC_EXPOSED_PAGE_SIZE;
>> -	struct nvm_rq *rqd;
>> +	struct rrpc_rq *rrqd;
>> 
>> 	do {
>> -		rqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
>> +		rrqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
>> 		schedule();
>> -	} while (!rqd);
>> +	} while (!rrqd);
>> 
>> -	if (IS_ERR(rqd)) {
>> +	if (IS_ERR(rrqd)) {
>> 		pr_err("rrpc: unable to acquire inflight IO\n");
>> 		bio_io_error(bio);
>> 		return;
>> 	}
>> 
>> 	rrpc_invalidate_range(rrpc, slba, len);
>> -	rrpc_inflight_laddr_release(rrpc, rqd);
>> +	rrpc_inflight_laddr_release(rrpc, rrqd);
>> }
>> 
>> static int block_is_full(struct rrpc *rrpc, struct rrpc_block *rblk)
>> @@ -166,8 +173,6 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
>> {
>> 	struct rrpc *rrpc = rlun->rrpc;
>> 
>> -	BUG_ON(!rblk);
>> -
>> 	if (rlun->cur) {
>> 		spin_lock(&rlun->cur->lock);
>> 		WARN_ON(!block_is_full(rrpc, rlun->cur));
>> @@ -176,12 +181,107 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
>> 	rlun->cur = rblk;
>> }
>> 
>> +static void rrpc_free_w_buffer(struct rrpc *rrpc, struct rrpc_block *rblk)
>> +{
>> +try:
>> +	spin_lock(&rblk->w_buf.s_lock);
>> +	if (atomic_read(&rblk->w_buf.refs) > 0) {
>> +		spin_unlock(&rblk->w_buf.s_lock);
>> +		schedule();
>> +		goto try;
>> +	}
>> +
>> +	mempool_free(rblk->w_buf.entries, rrpc->block_pool);
>> +	rblk->w_buf.entries = NULL;
>> +	spin_unlock(&rblk->w_buf.s_lock);
>> +
>> +	/* TODO: Reuse the same buffers if the block size is the same */
>> +	mempool_free(rblk->w_buf.data, rrpc->write_buf_pool);
>> +	kfree(rblk->w_buf.sync_bitmap);
>> +
>> +	rblk->w_buf.mem = NULL;
>> +	rblk->w_buf.subm = NULL;
>> +	rblk->w_buf.sync_bitmap = NULL;
>> +	rblk->w_buf.data = NULL;
>> +	rblk->w_buf.nentries = 0;
>> +	rblk->w_buf.cur_mem = 0;
>> +	rblk->w_buf.cur_subm = 0;
>> +}
>> +
>> +static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
>> +{
>> +	struct rrpc_lun *rlun = rblk->rlun;
>> +	struct nvm_lun *lun = rlun->parent;
>> +	struct rrpc_w_buf *buf = &rblk->w_buf;
>> +
>> +try:
>> +	spin_lock(&buf->w_lock);
>> +	/* Flush inflight I/Os */
>> +	if (!bitmap_full(buf->sync_bitmap, buf->cur_mem)) {
>> +		spin_unlock(&buf->w_lock);
>> +		schedule();
>> +		goto try;
>> +	}
>> +	spin_unlock(&buf->w_lock);
>> +
>> +	if (rblk->w_buf.entries)
>> +		rrpc_free_w_buffer(rrpc, rblk);
>> +
>> +	spin_lock(&lun->lock);
>> +	nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
>> +	list_del(&rblk->list);
>> +	spin_unlock(&lun->lock);
>> +}
>> +
>> +static void rrpc_put_blks(struct rrpc *rrpc)
>> +{
>> +	struct rrpc_lun *rlun;
>> +	int i;
>> +
>> +	for (i = 0; i < rrpc->nr_luns; i++) {
>> +		rlun = &rrpc->luns[i];
>> +		if (rlun->cur)
>> +			rrpc_put_blk(rrpc, rlun->cur);
>> +		if (rlun->gc_cur)
>> +			rrpc_put_blk(rrpc, rlun->gc_cur);
>> +	}
>> +}
>> +
>> static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
>> 							unsigned long flags)
>> {
>> +	struct nvm_dev *dev = rrpc->dev;
>> 	struct nvm_lun *lun = rlun->parent;
>> 	struct nvm_block *blk;
>> 	struct rrpc_block *rblk;
>> +	struct buf_entry *entries;
>> +	unsigned long *sync_bitmap;
>> +	void *data;
>> +	int nentries = dev->pgs_per_blk * dev->sec_per_pg;
>> +
>> +	data = mempool_alloc(rrpc->write_buf_pool, GFP_ATOMIC);
>> +	if (!data) {
>> +		pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
>> +		return NULL;
>> +	}
>> +
>> +	entries = mempool_alloc(rrpc->block_pool, GFP_ATOMIC);
>> +	if (!entries) {
>> +		pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
>> +		mempool_free(data, rrpc->write_buf_pool);
>> +		return NULL;
>> +	}
>> +
>> +	/* TODO: Mempool? */
> 
> Not on fast path. I don't think we need mempools at all to take some
> unnecessary memory.

Ok.

> 
>> +	sync_bitmap = kmalloc(BITS_TO_LONGS(nentries) *
>> +					sizeof(unsigned long), GFP_ATOMIC);
>> +	if (!sync_bitmap) {
>> +		mempool_free(data, rrpc->write_buf_pool);
>> +		mempool_free(entries, rrpc->block_pool);
>> +		return NULL;
>> +	}
>> +
>> +	bitmap_zero(sync_bitmap, nentries);
>> 
>> 	spin_lock(&lun->lock);
>> 	blk = nvm_get_blk_unlocked(rrpc->dev, rlun->parent, flags);
>> @@ -192,43 +292,34 @@ static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
>> 	}
>> 
>> 	rblk = &rlun->blocks[blk->id];
>> -	list_add_tail(&rblk->list, &rlun->open_list);
>> -	spin_unlock(&lun->lock);
>> 
>> 	blk->priv = rblk;
>> -	bitmap_zero(rblk->invalid_pages, rrpc->dev->pgs_per_blk);
>> +	bitmap_zero(rblk->invalid_pages, dev->pgs_per_blk);
>> 	rblk->next_page = 0;
>> 	rblk->nr_invalid_pages = 0;
>> -	atomic_set(&rblk->data_cmnt_size, 0);
>> +
>> +	rblk->w_buf.data = data;
>> +	rblk->w_buf.entries = entries;
>> +	rblk->w_buf.sync_bitmap = sync_bitmap;
>> +
>> +	rblk->w_buf.entries->data  = rblk->w_buf.data;
>> +	rblk->w_buf.mem = rblk->w_buf.entries;
>> +	rblk->w_buf.subm = rblk->w_buf.entries;
>> +	rblk->w_buf.nentries = nentries;
>> +	rblk->w_buf.cur_mem = 0;
>> +	rblk->w_buf.cur_subm = 0;
>> +
>> +	atomic_set(&rblk->w_buf.refs, 0);
>> +
>> +	spin_lock_init(&rblk->w_buf.w_lock);
>> +	spin_lock_init(&rblk->w_buf.s_lock);
>> +
>> +	list_add_tail(&rblk->list, &rlun->open_list);
>> +	spin_unlock(&lun->lock);
>> 
>> 	return rblk;
>> }
>> 
>> -static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
>> -{
>> -	struct rrpc_lun *rlun = rblk->rlun;
>> -	struct nvm_lun *lun = rlun->parent;
>> -
>> -	spin_lock(&lun->lock);
>> -	nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
>> -	list_del(&rblk->list);
>> -	spin_unlock(&lun->lock);
>> -}
>> -
>> -static void rrpc_put_blks(struct rrpc *rrpc)
>> -{
>> -	struct rrpc_lun *rlun;
>> -	int i;
>> -
>> -	for (i = 0; i < rrpc->nr_luns; i++) {
>> -		rlun = &rrpc->luns[i];
>> -		if (rlun->cur)
>> -			rrpc_put_blk(rrpc, rlun->cur);
>> -		if (rlun->gc_cur)
>> -			rrpc_put_blk(rrpc, rlun->gc_cur);
>> -	}
>> -}
>> -
>> static struct rrpc_lun *get_next_lun(struct rrpc *rrpc)
>> {
>> 	int next = atomic_inc_return(&rrpc->next_lun);
>> @@ -247,6 +338,17 @@ static void rrpc_gc_kick(struct rrpc *rrpc)
>> 	}
>> }
>> 
>> +static void rrpc_writer_kick(struct rrpc *rrpc)
>> +{
>> +	struct rrpc_lun *rlun;
>> +	unsigned int i;
>> +
>> +	for (i = 0; i < rrpc->nr_luns; i++) {
>> +		rlun = &rrpc->luns[i];
>> +		queue_work(rrpc->kw_wq, &rlun->ws_writer);
>> +	}
>> +}
>> +
>> /*
>>  * timed GC every interval.
>>  */
>> @@ -282,7 +384,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
>> {
>> 	struct request_queue *q = rrpc->dev->q;
>> 	struct rrpc_rev_addr *rev;
>> -	struct nvm_rq *rqd;
>> +	struct rrpc_rq *rrqd;
>> 	struct bio *bio;
>> 	struct page *page;
>> 	int slot;
>> @@ -306,7 +408,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
>> 	}
>> 
>> 	while ((slot = find_first_zero_bit(rblk->invalid_pages,
>> -					    nr_pgs_per_blk)) < nr_pgs_per_blk) {
>> +					nr_pgs_per_blk)) < nr_pgs_per_blk) {
> 
> No need to fix the indentation here.
> 
>> /* Lock laddr */
>> 		phys_addr = (rblk->parent->id * nr_pgs_per_blk) + slot;
>> @@ -321,8 +423,8 @@ try:
>> 			continue;
>> 		}
>> 
>> -		rqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
>> -		if (IS_ERR_OR_NULL(rqd)) {
>> +		rrqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
>> +		if (IS_ERR_OR_NULL(rrqd)) {
>> 			spin_unlock(&rrpc->rev_lock);
>> 			schedule();
>> 			goto try;
>> @@ -339,14 +441,14 @@ try:
>> 		/* TODO: may fail when EXP_PG_SIZE > PAGE_SIZE */
>> 		bio_add_pc_page(q, bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
>> 
>> -		if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
>> +		if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)) {
>> 			pr_err("rrpc: gc read failed.\n");
>> -			rrpc_inflight_laddr_release(rrpc, rqd);
>> +			rrpc_inflight_laddr_release(rrpc, rrqd);
>> 			goto finished;
>> 		}
>> 		wait_for_completion_io(&wait);
>> 		if (bio->bi_error) {
>> -			rrpc_inflight_laddr_release(rrpc, rqd);
>> +			rrpc_inflight_laddr_release(rrpc, rrqd);
>> 			goto finished;
>> 		}
>> 
>> @@ -363,14 +465,16 @@ try:
>> 		/* turn the command around and write the data back to a new
>> 		 * address
>> 		 */
>> -		if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
>> +		if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)
>> +							!= NVM_IO_DONE) {
>> 			pr_err("rrpc: gc write failed.\n");
>> -			rrpc_inflight_laddr_release(rrpc, rqd);
>> +			rrpc_inflight_laddr_release(rrpc, rrqd);
>> 			goto finished;
>> 		}
>> +		bio_endio(bio);
>> 		wait_for_completion_io(&wait);
>> 
>> -		rrpc_inflight_laddr_release(rrpc, rqd);
>> +		/* rrpc_inflight_laddr_release(rrpc, rrqd); */
> 
> This is commented out.
> 
>> if (bio->bi_error)
>> 			goto finished;
>> 
>> @@ -514,6 +618,8 @@ static void rrpc_gc_queue(struct work_struct *work)
>> 	list_move_tail(&rblk->list, &rlun->closed_list);
>> 	spin_unlock(&lun->lock);
>> 
>> +	rrpc_free_w_buffer(rrpc, rblk);
>> +
>> 	mempool_free(gcb, rrpc->gcb_pool);
>> 	pr_debug("nvm: block '%lu' is full, allow GC (sched)\n",
>> 							rblk->parent->id);
>> @@ -663,43 +769,72 @@ static void rrpc_run_gc(struct rrpc *rrpc, struct rrpc_block *rblk)
>> 	queue_work(rrpc->kgc_wq, &gcb->ws_gc);
>> }
>> 
>> -static void rrpc_end_io_write(struct rrpc *rrpc, struct rrpc_rq *rrqd,
>> -						sector_t laddr, uint8_t npages)
>> +static void rrpc_sync_buffer(struct rrpc *rrpc, struct rrpc_addr *p)
>> {
>> -	struct rrpc_addr *p;
>> 	struct rrpc_block *rblk;
>> +	struct rrpc_w_buf *buf;
>> 	struct nvm_lun *lun;
>> -	int cmnt_size, i;
>> +	unsigned long bppa;
>> 
>> -	for (i = 0; i < npages; i++) {
>> -		p = &rrpc->trans_map[laddr + i];
>> -		rblk = p->rblk;
>> -		lun = rblk->parent->lun;
>> +	rblk = p->rblk;
>> +	buf = &rblk->w_buf;
>> +	lun = rblk->parent->lun;
>> 
>> -		cmnt_size = atomic_inc_return(&rblk->data_cmnt_size);
>> -		if (unlikely(cmnt_size == rrpc->dev->pgs_per_blk))
>> -			rrpc_run_gc(rrpc, rblk);
>> +	bppa = rrpc->dev->sec_per_blk * rblk->parent->id;
>> +
>> +	WARN_ON(test_and_set_bit((p->addr - bppa), buf->sync_bitmap));
>> +
>> +	if (unlikely(bitmap_full(buf->sync_bitmap, buf->nentries))) {
>> +		/* Write buffer out-of-bounds */
>> +		WARN_ON((buf->cur_mem != buf->nentries) &&
>> +					(buf->cur_mem != buf->cur_subm));
>> +
>> +		rrpc_run_gc(rrpc, rblk);
>> +	}
>> +}
>> +
>> +static void rrpc_end_io_write(struct rrpc *rrpc, struct nvm_rq *rqd,
>> +							uint8_t nr_pages)
>> +{
>> +	struct rrpc_buf_rq *brrqd = nvm_rq_to_pdu(rqd);
>> +	struct rrpc_rq *rrqd;
>> +	int i;
>> +
>> +	for (i = 0; i < nr_pages; i++) {
>> +		rrqd = brrqd[i].rrqd;
>> +		rrpc_sync_buffer(rrpc, brrqd[i].addr);
>> +		kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
>> 	}
>> +
>> +	mempool_free(brrqd, rrpc->m_rrq_pool);
>> +	rrpc_writer_kick(rrpc);
>> +}
>> +
>> +static void rrpc_end_io_read(struct rrpc *rrpc, struct nvm_rq *rqd,
>> +							uint8_t nr_pages)
>> +{
>> +	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
>> +
>> +	if (rqd->flags & NVM_IOTYPE_GC)
>> +		return;
>> +
>> +	rrpc_unlock_rq(rrpc, rrqd);
>> +	mempool_free(rrqd, rrpc->rrq_pool);
>> }
>> 
>> static void rrpc_end_io(struct nvm_rq *rqd)
>> {
>> 	struct rrpc *rrpc = container_of(rqd->ins, struct rrpc, instance);
>> -	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
>> -	uint8_t npages = rqd->nr_pages;
>> -	sector_t laddr = rrpc_get_laddr(rqd->bio) - npages;
>> +	uint8_t nr_pages = rqd->nr_pages;
>> 
>> 	if (bio_data_dir(rqd->bio) == WRITE)
>> -		rrpc_end_io_write(rrpc, rrqd, laddr, npages);
>> +		rrpc_end_io_write(rrpc, rqd, nr_pages);
>> +	else
>> +		rrpc_end_io_read(rrpc, rqd, nr_pages);
>> 
>> 	bio_put(rqd->bio);
>> 
>> -	if (rrqd->flags & NVM_IOTYPE_GC)
>> -		return;
>> -
>> -	rrpc_unlock_rq(rrpc, rqd);
>> -
>> -	if (npages > 1)
>> +	if (nr_pages > 1)
> 
> Cleanup patches can go in afterwards or before.
Ok.

> 
>> nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
>> 	if (rqd->metadata)
>> 		nvm_dev_dma_free(rrpc->dev, rqd->metadata, rqd->dma_metadata);
>> @@ -708,20 +843,24 @@ static void rrpc_end_io(struct nvm_rq *rqd)
>> }
>> 
>> static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
>> -			struct nvm_rq *rqd, unsigned long flags, int npages)
>> +			struct nvm_rq *rqd, struct rrpc_buf_rq *brrqd,
>> +			unsigned long flags, int nr_pages)
>> {
>> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
>> +	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
>> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
>> 	struct rrpc_addr *gp;
>> 	sector_t laddr = rrpc_get_laddr(bio);
>> 	int is_gc = flags & NVM_IOTYPE_GC;
>> 	int i;
>> 
>> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
>> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
>> 		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
>> +		mempool_free(rrqd, rrpc->rrq_pool);
>> +		mempool_free(rqd, rrpc->rq_pool);
> 
> Seems like there is a relationship here between rrq_pool (rrpc_rq) and
> rq_pool (nvm_rq)
> 
> To get to a single mempool alloc ( and just keep the nvm_rq mempool )

Yes. I have a patch splitting rrqd into a different struct for the write
and the read path. they ended up being quite different afterall...
> 
> 
> 
>> return NVM_IO_REQUEUE;
>> 	}
>> 
>> -	for (i = 0; i < npages; i++) {
>> +	for (i = 0; i < nr_pages; i++) {
>> 		/* We assume that mapping occurs at 4KB granularity */
>> 		BUG_ON(!(laddr + i >= 0 && laddr + i < rrpc->nr_sects));
>> 		gp = &rrpc->trans_map[laddr + i];
>> @@ -734,8 +873,12 @@ static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
>> 			rrpc_unlock_laddr(rrpc, r);
>> 			nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
>> 							rqd->dma_ppa_list);
>> +			mempool_free(rrqd, rrpc->rrq_pool);
>> +			mempool_free(rqd, rrpc->rq_pool);
>> 			return NVM_IO_DONE;
>> 		}
>> +
>> +		brrqd[i].addr = gp;
>> 	}
>> 
>> 	rqd->opcode = NVM_OP_HBREAD;
>> @@ -751,8 +894,11 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
>> 	sector_t laddr = rrpc_get_laddr(bio);
>> 	struct rrpc_addr *gp;
>> 
>> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
>> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
>> +		mempool_free(rrqd, rrpc->rrq_pool);
>> +		mempool_free(rqd, rrpc->rq_pool);
>> 		return NVM_IO_REQUEUE;
>> +	}
>> 
>> 	BUG_ON(!(laddr >= 0 && laddr < rrpc->nr_sects));
>> 	gp = &rrpc->trans_map[laddr];
>> @@ -761,7 +907,9 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
>> 		rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, gp->addr);
>> 	} else {
>> 		BUG_ON(is_gc);
>> -		rrpc_unlock_rq(rrpc, rqd);
>> +		rrpc_unlock_rq(rrpc, rrqd);
>> +		mempool_free(rrqd, rrpc->rrq_pool);
>> +		mempool_free(rqd, rrpc->rq_pool);
>> 		return NVM_IO_DONE;
>> 	}
>> 
>> @@ -771,120 +919,190 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
>> 	return NVM_IO_OK;
>> }
>> 
>> +/*
>> + * Copy data from current bio to block write buffer. This if necessary
>> + * to guarantee durability if a flash block becomes bad before all pages
>> + * are written. This buffer is also used to write at the right page
>> + * granurality
>> + */
>> +static int rrpc_write_to_buffer(struct rrpc *rrpc, struct bio *bio,
>> +				struct rrpc_rq *rrqd, struct rrpc_addr *addr,
>> +				struct rrpc_w_buf *w_buf,
>> +				unsigned long flags)
>> +{
>> +	struct nvm_dev *dev = rrpc->dev;
>> +	unsigned int bio_len = bio_cur_bytes(bio);
>> +
>> +	if (bio_len != RRPC_EXPOSED_PAGE_SIZE)
>> +		return NVM_IO_ERR;
>> +
>> +	spin_lock(&w_buf->w_lock);
>> +
>> +	WARN_ON(w_buf->cur_mem == w_buf->nentries);
>> +
>> +	w_buf->mem->rrqd = rrqd;
>> +	w_buf->mem->addr = addr;
>> +	w_buf->mem->flags = flags;
>> +
>> +	memcpy(w_buf->mem->data, bio_data(bio), bio_len);
>> +
>> +	w_buf->cur_mem++;
>> +	if (likely(w_buf->cur_mem < w_buf->nentries)) {
>> +		w_buf->mem++;
>> +		w_buf->mem->data =
>> +				w_buf->data + (w_buf->cur_mem * dev->sec_size);
>> +	}
>> +
>> +	spin_unlock(&w_buf->w_lock);
>> +
>> +	return 0;
>> +}
>> +
>> static int rrpc_write_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
>> -			struct nvm_rq *rqd, unsigned long flags, int npages)
>> +			struct rrpc_rq *rrqd, unsigned long flags, int nr_pages)
>> {
>> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
>> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
>> +	struct rrpc_w_buf *w_buf;
>> 	struct rrpc_addr *p;
>> +	struct rrpc_lun *rlun;
>> 	sector_t laddr = rrpc_get_laddr(bio);
>> 	int is_gc = flags & NVM_IOTYPE_GC;
>> +	int err;
>> 	int i;
>> 
>> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
>> -		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
>> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
>> +		mempool_free(rrqd, rrpc->rrq_pool);
>> 		return NVM_IO_REQUEUE;
>> 	}
>> 
>> -	for (i = 0; i < npages; i++) {
>> +	for (i = 0; i < nr_pages; i++) {
>> +		kref_get(&rrqd->refs);
>> +
>> 		/* We assume that mapping occurs at 4KB granularity */
>> 		p = rrpc_map_page(rrpc, laddr + i, is_gc);
>> 		if (!p) {
>> 			BUG_ON(is_gc);
>> 			rrpc_unlock_laddr(rrpc, r);
>> -			nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
>> -							rqd->dma_ppa_list);
>> +			mempool_free(rrqd, rrpc->rrq_pool);
>> 			rrpc_gc_kick(rrpc);
>> 			return NVM_IO_REQUEUE;
>> 		}
>> 
>> -		rqd->ppa_list[i] = rrpc_ppa_to_gaddr(rrpc->dev,
>> -								p->addr);
>> +		w_buf = &p->rblk->w_buf;
>> +		rlun = p->rblk->rlun;
>> +
>> +		rrqd->addr = p;
>> +
>> +		err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
>> +		if (err) {
>> +			pr_err("rrpc: could not write to write buffer\n");
>> +			return err;
>> +		}
>> +
>> +		bio_advance(bio, RRPC_EXPOSED_PAGE_SIZE);
>> +
>> +		queue_work(rrpc->kw_wq, &rlun->ws_writer);
> 
> How about a timer and only the kick when a flush command is sent?

Ok. My though was that we might want to flush the buffer as fast as we
can since we will have several open blocks at the same time. I can try
with the timer + flush when an explicit flush or fua is sent and test
the memory pressure.

> 
>> }
>> 
>> -	rqd->opcode = NVM_OP_HBWRITE;
>> +	if (kref_put(&rrqd->refs, rrpc_release_and_free_rrqd)) {
>> +		pr_err("rrpc: request reference counter dailed\n");
>> +		return NVM_IO_ERR;
>> +	}
>> 
>> -	return NVM_IO_OK;
>> +	return NVM_IO_DONE;
>> }
>> 
>> static int rrpc_write_rq(struct rrpc *rrpc, struct bio *bio,
>> -				struct nvm_rq *rqd, unsigned long flags)
>> +				struct rrpc_rq *rrqd, unsigned long flags)
>> {
>> -	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
>> +	struct rrpc_w_buf *w_buf;
>> 	struct rrpc_addr *p;
>> +	struct rrpc_lun *rlun;
>> 	int is_gc = flags & NVM_IOTYPE_GC;
>> +	int err;
>> 	sector_t laddr = rrpc_get_laddr(bio);
>> 
>> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
>> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
>> +		mempool_free(rrqd, rrpc->rrq_pool);
>> 		return NVM_IO_REQUEUE;
>> +	}
>> 
>> 	p = rrpc_map_page(rrpc, laddr, is_gc);
>> 	if (!p) {
>> 		BUG_ON(is_gc);
>> -		rrpc_unlock_rq(rrpc, rqd);
>> +		rrpc_unlock_rq(rrpc, rrqd);
>> +		mempool_free(rrqd, rrpc->rrq_pool);
>> 		rrpc_gc_kick(rrpc);
>> 		return NVM_IO_REQUEUE;
>> 	}
>> 
>> -	rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, p->addr);
>> -	rqd->opcode = NVM_OP_HBWRITE;
>> +	w_buf = &p->rblk->w_buf;
>> +	rlun = p->rblk->rlun;
>> +
>> 	rrqd->addr = p;
>> 
>> -	return NVM_IO_OK;
>> +	err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
>> +	if (err) {
>> +		pr_err("rrpc: could not write to write buffer\n");
>> +		return err;
>> +	}
>> +
>> +	queue_work(rrpc->kw_wq, &rlun->ws_writer);
>> +	return NVM_IO_DONE;
>> }
>> 
>> -static int rrpc_setup_rq(struct rrpc *rrpc, struct bio *bio,
>> -			struct nvm_rq *rqd, unsigned long flags, uint8_t npages)
>> +static int rrpc_submit_read(struct rrpc *rrpc, struct bio *bio,
>> +				struct rrpc_rq *rrqd, unsigned long flags)
>> {
>> -	if (npages > 1) {
>> +	struct nvm_rq *rqd;
>> +	struct rrpc_buf_rq brrqd[rrpc->max_write_pgs];
>> +	uint8_t nr_pages = rrpc_get_pages(bio);
>> +	int err;
>> +
>> +	rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
>> +	if (!rqd) {
>> +		pr_err_ratelimited("rrpc: not able to queue bio.");
>> +		bio_io_error(bio);
>> +		return NVM_IO_ERR;
>> +	}
>> +	rqd->metadata = NULL;
>> +	rqd->priv = rrqd;
>> +
>> +	if (nr_pages > 1) {
>> 		rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_KERNEL,
>> -							&rqd->dma_ppa_list);
>> +						&rqd->dma_ppa_list);
>> 		if (!rqd->ppa_list) {
>> 			pr_err("rrpc: not able to allocate ppa list\n");
>> +			mempool_free(rrqd, rrpc->rrq_pool);
>> +			mempool_free(rqd, rrpc->rq_pool);
>> 			return NVM_IO_ERR;
>> 		}
>> 
>> -		if (bio_rw(bio) == WRITE)
>> -			return rrpc_write_ppalist_rq(rrpc, bio, rqd, flags,
>> -									npages);
>> -
>> -		return rrpc_read_ppalist_rq(rrpc, bio, rqd, flags, npages);
>> +		err = rrpc_read_ppalist_rq(rrpc, bio, rqd, brrqd, flags,
>> +								nr_pages);
>> +		if (err) {
>> +			mempool_free(rrqd, rrpc->rrq_pool);
>> +			mempool_free(rqd, rrpc->rq_pool);
>> +			return err;
>> +		}
>> +	} else {
>> +		err = rrpc_read_rq(rrpc, bio, rqd, flags);
>> +		if (err)
>> +			return err;
>> 	}
>> 
>> -	if (bio_rw(bio) == WRITE)
>> -		return rrpc_write_rq(rrpc, bio, rqd, flags);
>> -
>> -	return rrpc_read_rq(rrpc, bio, rqd, flags);
>> -}
>> -
>> -static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
>> -				struct nvm_rq *rqd, unsigned long flags)
>> -{
>> -	int err;
>> -	struct rrpc_rq *rrq = nvm_rq_to_pdu(rqd);
>> -	uint8_t nr_pages = rrpc_get_pages(bio);
>> -	int bio_size = bio_sectors(bio) << 9;
>> -
>> -	if (bio_size < rrpc->dev->sec_size)
>> -		return NVM_IO_ERR;
>> -	else if (bio_size > rrpc->dev->max_rq_size)
>> -		return NVM_IO_ERR;
>> -
>> -	err = rrpc_setup_rq(rrpc, bio, rqd, flags, nr_pages);
>> -	if (err)
>> -		return err;
>> -
>> 	bio_get(bio);
>> 	rqd->bio = bio;
>> 	rqd->ins = &rrpc->instance;
>> -	rqd->nr_pages = nr_pages;
>> -	rrq->flags = flags;
>> +	rqd->nr_pages = rrqd->nr_pages = nr_pages;
>> +	rqd->flags = flags;
>> 
>> 	err = nvm_submit_io(rrpc->dev, rqd);
>> 	if (err) {
>> 		pr_err("rrpc: I/O submission failed: %d\n", err);
>> 		bio_put(bio);
>> 		if (!(flags & NVM_IOTYPE_GC)) {
>> -			rrpc_unlock_rq(rrpc, rqd);
>> +			rrpc_unlock_rq(rrpc, rrqd);
>> 			if (rqd->nr_pages > 1)
>> 				nvm_dev_dma_free(rrpc->dev,
>> 			rqd->ppa_list, rqd->dma_ppa_list);
>> @@ -895,10 +1113,39 @@ static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
>> 	return NVM_IO_OK;
>> }
>> 
>> +static int rrpc_buffer_write(struct rrpc *rrpc, struct bio *bio,
>> +				struct rrpc_rq *rrqd, unsigned long flags)
>> +{
>> +	uint8_t nr_pages = rrpc_get_pages(bio);
>> +
>> +	rrqd->nr_pages = nr_pages;
>> +
>> +	if (nr_pages > 1)
>> +		return rrpc_write_ppalist_rq(rrpc, bio, rrqd, flags, nr_pages);
>> +	else
> 
> You can skip the else here.
> 
>> +		return rrpc_write_rq(rrpc, bio, rrqd, flags);
>> +}
>> +
>> +static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
>> +				struct rrpc_rq *rrqd, unsigned long flags)
>> +{
>> +	int bio_size = bio_sectors(bio) << 9;
>> +
>> +	if (bio_size < rrpc->dev->sec_size)
>> +		return NVM_IO_ERR;
>> +	else if (bio_size > rrpc->dev->max_rq_size)
>> +		return NVM_IO_ERR;
> 
> I have a patch incoming that removes this. Properly making
> rrpc_submit_io obsolete.

Could you push it to for-next so that I can rebase on top of it for the
next version?

> 
>> +
>> +	if (bio_rw(bio) == READ)
>> +		return rrpc_submit_read(rrpc, bio, rrqd, flags);
>> +
>> +	return rrpc_buffer_write(rrpc, bio, rrqd, flags);
>> +}
>> +
>> static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
>> {
>> 	struct rrpc *rrpc = q->queuedata;
>> -	struct nvm_rq *rqd;
>> +	struct rrpc_rq *rrqd;
>> 	int err;
>> 
>> 	if (bio->bi_rw & REQ_DISCARD) {
>> @@ -906,15 +1153,15 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
>> 		return BLK_QC_T_NONE;
>> 	}
>> 
>> -	rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
>> -	if (!rqd) {
>> -		pr_err_ratelimited("rrpc: not able to queue bio.");
>> +	rrqd = mempool_alloc(rrpc->rrq_pool, GFP_KERNEL);
>> +	if (!rrqd) {
>> +		pr_err_ratelimited("rrpc: not able to allocate rrqd.");
>> 		bio_io_error(bio);
>> 		return BLK_QC_T_NONE;
>> 	}
>> -	memset(rqd, 0, sizeof(struct nvm_rq));
> 
> It seems like we don't use nvm_rq in the write patch? (the writer thread
> will do the write itself). Move this inside read? and use the original
> nvm_rq for submission?

Ok.

> 
>> +	kref_init(&rrqd->refs);
>> 
>> -	err = rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_NONE);
>> +	err = rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_NONE);
>> 	switch (err) {
>> 	case NVM_IO_OK:
>> 		return BLK_QC_T_NONE;
>> @@ -932,10 +1179,221 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
>> 		break;
>> 	}
>> 
>> -	mempool_free(rqd, rrpc->rq_pool);
>> 	return BLK_QC_T_NONE;
>> }
>> 
>> +static int rrpc_alloc_page_in_bio(struct rrpc *rrpc, struct bio *bio,
>> +								void *data)
>> +{
>> +	struct page *page;
>> +	int err;
>> +
>> +	if (PAGE_SIZE != RRPC_EXPOSED_PAGE_SIZE)
>> +		return -1;
> 
> return -EINVAL?
> 
> This doesn't work on power and sparch architectures (with larger
> PAGE_SIZEs). Would be great to handle that as well.

This is intrinsic to the rrpc design: from the map table to the bio
formation in GC, the underlying assumption is that PAGE_SIZE ==
RRPC_EXPOSED_PAGE_SIZE. It requires more changes than this bio formation
to support different page sizes. I think this belongs to a different
patch.

> 
>> +
>> +	page = virt_to_page(data);
>> +	if (!page) {
>> +		pr_err("nvm: rrpc: could not alloc page\n");
>> +		return -1;
>> +	}
>> +
>> +	err = bio_add_page(bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
>> +	if (err != RRPC_EXPOSED_PAGE_SIZE) {
>> +		pr_err("nvm: rrpc: could not add page to bio\n");
>> +		return -1;
>> +	}
>> +
>> +	return 0;
>> +}
>> +
>> +static void rrpc_submit_write(struct work_struct *work)
>> +{
>> +	struct rrpc_lun *rlun = container_of(work, struct rrpc_lun, ws_writer);
>> +	struct rrpc *rrpc = rlun->rrpc;
>> +	struct nvm_dev *dev = rrpc->dev;
>> +	struct rrpc_addr *addr;
>> +	struct rrpc_rq *rrqd;
>> +	struct rrpc_buf_rq *brrqd;
>> +	void *data;
>> +	struct nvm_rq *rqd;
>> +	struct rrpc_block *rblk;
>> +	struct bio *bio;
>> +	int pgs_to_sync, pgs_avail;
>> +	int sync = NVM_SYNC_HARD;
>> +	int err;
>> +	int i;
>> +
>> +	/* Note that OS pages are typically mapped to flash page sectors, which
>> +	 * are 4K; a flash page might be formed of several sectors. Also,
>> +	 * controllers typically program flash pages across multiple planes.
>> +	 * This is the flash programing granurality, and the reason behind the
>> +	 * sync strategy performed in this write thread.
>> +	 */
>> +try:
>> +	spin_lock(&rlun->parent->lock);
>> +	list_for_each_entry(rblk, &rlun->open_list, list) {
>> +		if (!spin_trylock(&rblk->w_buf.w_lock))
>> +			continue;
>> +
>> +		/* If the write thread has already submitted all I/Os in the
>> +		 * write buffer for this block ignore that the block is in the
>> +		 * open list; it is on its way to the closed list. This enables
>> +		 * us to avoid taking a lock on the list.
>> +		 */
>> +		if (unlikely(rblk->w_buf.cur_subm == rblk->w_buf.nentries)) {
>> +			spin_unlock(&rblk->w_buf.w_lock);
>> +			spin_unlock(&rlun->parent->lock);
>> +			schedule();
>> +			goto try;
>> +		}
>> +		pgs_avail = rblk->w_buf.cur_mem - rblk->w_buf.cur_subm;
>> +
>> +		switch (sync) {
>> +		case NVM_SYNC_SOFT:
>> +			pgs_to_sync = (pgs_avail >= rrpc->max_write_pgs) ?
>> +					rrpc->max_write_pgs : 0;
>> +			break;
>> +		case NVM_SYNC_HARD:
>> +			if (pgs_avail >= rrpc->max_write_pgs)
>> +				pgs_to_sync = rrpc->max_write_pgs;
>> +			else if (pgs_avail >= rrpc->min_write_pgs)
>> +				pgs_to_sync = rrpc->min_write_pgs *
>> +					(pgs_avail / rrpc->min_write_pgs);
>> +			else
>> +				pgs_to_sync = pgs_avail; /* TODO: ADD PADDING */
>> +			break;
>> +		case NVM_SYNC_OPORT:
>> +			if (pgs_avail >= rrpc->max_write_pgs)
>> +				pgs_to_sync = rrpc->max_write_pgs;
>> +			else if (pgs_avail >= rrpc->min_write_pgs)
>> +				pgs_to_sync = rrpc->min_write_pgs *
>> +					(pgs_avail / rrpc->min_write_pgs);
>> +			else
>> +				pgs_to_sync = 0;
>> +		}
>> +
>> +		if (pgs_to_sync == 0) {
>> +			spin_unlock(&rblk->w_buf.w_lock);
>> +			continue;
>> +		}
>> +
>> +		bio = bio_alloc(GFP_ATOMIC, pgs_to_sync);
>> +		if (!bio) {
>> +			pr_err("nvm: rrpc: could not alloc write bio\n");
>> +			goto out1;
>> +		}
>> +
>> +		rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
>> +		if (!rqd) {
>> +			pr_err_ratelimited("rrpc: not able to create w req.");
>> +			goto out2;
>> +		}
>> +		rqd->metadata = NULL;
>> +
>> +		brrqd = mempool_alloc(rrpc->m_rrq_pool, GFP_ATOMIC);
>> +		if (!brrqd) {
>> +			pr_err_ratelimited("rrpc: not able to create w rea.");
>> +			goto out3;
>> +		}
>> +
>> +		bio->bi_iter.bi_sector = 0; /* artificial bio */
>> +		bio->bi_rw = WRITE;
>> +
>> +		rqd->opcode = NVM_OP_HBWRITE;
>> +		rqd->bio = bio;
>> +		rqd->ins = &rrpc->instance;
>> +		rqd->nr_pages = pgs_to_sync;
>> +		rqd->priv = brrqd;
>> +
>> +		if (pgs_to_sync == 1) {
>> +			rrqd = rblk->w_buf.subm->rrqd;
>> +			addr = rblk->w_buf.subm->addr;
>> +			rqd->flags = rblk->w_buf.subm->flags;
>> +			data = rblk->w_buf.subm->data;
>> +
>> +			err = rrpc_alloc_page_in_bio(rrpc, bio, data);
>> +			if (err) {
>> +				pr_err("rrpc: cannot allocate page in bio\n");
>> +				goto out4;
>> +			}
>> +
>> +			/* TODO: This address can be skipped */
>> +			if (addr->addr == ADDR_EMPTY)
>> +				pr_err_ratelimited("rrpc: submitting empty rq");
>> +
>> +			rqd->ppa_addr = rrpc_ppa_to_gaddr(dev, addr->addr);
>> +
>> +			brrqd[0].rrqd = rrqd;
>> +			brrqd[0].addr = addr;
>> +
>> +			rblk->w_buf.subm++;
>> +			rblk->w_buf.cur_subm++;
>> +
>> +			goto submit_io;
>> +		}
>> +
>> +		/* This bio will contain several pppas */
>> +		rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_ATOMIC,
>> +							&rqd->dma_ppa_list);
>> +		if (!rqd->ppa_list) {
>> +			pr_err("rrpc: not able to allocate ppa list\n");
>> +			goto out4;
>> +		}
>> +
>> +		for (i = 0; i < pgs_to_sync; i++) {
>> +			rrqd = rblk->w_buf.subm->rrqd;
>> +			addr = rblk->w_buf.subm->addr;
>> +			rqd->flags = rblk->w_buf.subm->flags;
>> +			data = rblk->w_buf.subm->data;
>> +
>> +			err = rrpc_alloc_page_in_bio(rrpc, bio, data);
>> +			if (err) {
>> +				pr_err("rrpc: cannot allocate page in bio\n");
>> +				goto out5;
>> +			}
>> +
>> +			/* TODO: This address can be skipped */
>> +			if (addr->addr == ADDR_EMPTY)
>> +				pr_err_ratelimited("rrpc: submitting empty rq");
>> +
>> +			rqd->ppa_list[i] = rrpc_ppa_to_gaddr(dev, addr->addr);
>> +
>> +			brrqd[i].rrqd = rrqd;
>> +			brrqd[i].addr = addr;
>> +
>> +			rblk->w_buf.subm++;
>> +			rblk->w_buf.cur_subm++;
>> +		}
>> +
>> +submit_io:
>> +		WARN_ON(rblk->w_buf.cur_subm > rblk->w_buf.nentries);
>> +
>> +		spin_unlock(&rblk->w_buf.w_lock);
>> +
>> +		err = nvm_submit_io(dev, rqd);
>> +		if (err) {
>> +			pr_err("rrpc: I/O submission failed: %d\n", err);
>> +			mempool_free(rqd, rrpc->rq_pool);
>> +			bio_put(bio);
>> +		}
>> +	}
>> +
>> +	spin_unlock(&rlun->parent->lock);
>> +	return;
>> +
>> +out5:
>> +	nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
>> +out4:
>> +	mempool_free(brrqd, rrpc->m_rrq_pool);
>> +out3:
>> +	mempool_free(rqd, rrpc->rq_pool);
>> +out2:
>> +	bio_put(bio);
>> +out1:
>> +	spin_unlock(&rblk->w_buf.w_lock);
>> +	spin_unlock(&rlun->parent->lock);
>> +}
>> +
> 
> The function can at least be split up in two parts. The what to sync and
> the actual write. The ret* labels should be replaced with more sane names.

Ok.

> 
>> static void rrpc_requeue(struct work_struct *work)
>> {
>> 	struct rrpc *rrpc = container_of(work, struct rrpc, ws_requeue);
>> @@ -978,7 +1436,7 @@ static void rrpc_gc_free(struct rrpc *rrpc)
>> 
>> static int rrpc_gc_init(struct rrpc *rrpc)
>> {
>> -	rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM|WQ_UNBOUND,
>> +	rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM | WQ_UNBOUND,
> 
> No need for reformatting
> 
>> rrpc->nr_luns);
>> 	if (!rrpc->krqd_wq)
>> 		return -ENOMEM;
>> @@ -1080,6 +1538,8 @@ static int rrpc_map_init(struct rrpc *rrpc)
>> 
>> static int rrpc_core_init(struct rrpc *rrpc)
>> {
>> +	struct nvm_dev *dev = rrpc->dev;
>> +
>> 	down_write(&rrpc_lock);
>> 	if (!rrpc_gcb_cache) {
>> 		rrpc_gcb_cache = kmem_cache_create("rrpc_gcb",
>> @@ -1089,14 +1549,70 @@ static int rrpc_core_init(struct rrpc *rrpc)
>> 			return -ENOMEM;
>> 		}
>> 
>> -		rrpc_rq_cache = kmem_cache_create("rrpc_rq",
>> -				sizeof(struct nvm_rq) + sizeof(struct rrpc_rq),
>> -				0, 0, NULL);
>> +		rrpc_rq_cache = kmem_cache_create("nvm_rq",
>> +					sizeof(struct nvm_rq), 0, 0, NULL);
>> 		if (!rrpc_rq_cache) {
>> 			kmem_cache_destroy(rrpc_gcb_cache);
>> 			up_write(&rrpc_lock);
>> 			return -ENOMEM;
>> 		}
>> +
>> +		rrpc_rrq_cache = kmem_cache_create("rrpc_rrq",
>> +					sizeof(struct rrpc_rq), 0, 0, NULL);
>> +		if (!rrpc_rrq_cache) {
>> +			kmem_cache_destroy(rrpc_gcb_cache);
>> +			kmem_cache_destroy(rrpc_rq_cache);
>> +			up_write(&rrpc_lock);
>> +			return -ENOMEM;
>> +		}
>> +
>> +		rrpc_buf_rrq_cache = kmem_cache_create("rrpc_m_rrq",
>> +			rrpc->max_write_pgs * sizeof(struct rrpc_buf_rq),
>> +			0, 0, NULL);
>> +		if (!rrpc_buf_rrq_cache) {
>> +			kmem_cache_destroy(rrpc_gcb_cache);
>> +			kmem_cache_destroy(rrpc_rq_cache);
>> +			kmem_cache_destroy(rrpc_rrq_cache);
>> +			up_write(&rrpc_lock);
>> +			return -ENOMEM;
>> +		}
>> +	}
>> +
>> +	/* we assume that sec->sec_size is the same as the page size exposed by
>> +	 * rrpc (4KB). We need extra logic otherwise
>> +	 */
>> +	if (!rrpc_block_cache) {
>> +		/* Write buffer: Allocate all buffer (for all block) at once. We
>> +		 * avoid having to allocate a memory from the pool for each IO
>> +		 * at the cost pre-allocating memory for the whole block when a
>> +		 * new block is allocated from the media manager.
>> +		 */
>> +		rrpc_wb_cache = kmem_cache_create("nvm_wb",
>> +			dev->pgs_per_blk * dev->sec_per_pg * dev->sec_size,
>> +			0, 0, NULL);
> 
> Seems a bit excessive to allocate a mempool for this. Why not do a
> vmalloc for when the memory is needed? The normal case is 4MB? per
> entry, and the cache creates 8 of them in the pool, using 64MB out of
> the gate. We are not in a hot path at this point in time, so lets not
> waste the resource when we might not use them.

Good. Thanks!

> 
>> +		if (!rrpc_wb_cache) {
>> +			kmem_cache_destroy(rrpc_gcb_cache);
>> +			kmem_cache_destroy(rrpc_rq_cache);
>> +			kmem_cache_destroy(rrpc_rrq_cache);
>> +			kmem_cache_destroy(rrpc_buf_rrq_cache);
>> +			up_write(&rrpc_lock);
>> +			return -ENOMEM;
>> +		}
>> +
>> +		/* Write buffer entries */
>> +		rrpc_block_cache = kmem_cache_create("nvm_entry",
>> +			dev->pgs_per_blk * dev->sec_per_pg *
>> +			sizeof(struct buf_entry),
>> +			0, 0, NULL);
>> +		if (!rrpc_block_cache) {
>> +			kmem_cache_destroy(rrpc_gcb_cache);
>> +			kmem_cache_destroy(rrpc_rq_cache);
>> +			kmem_cache_destroy(rrpc_rrq_cache);
>> +			kmem_cache_destroy(rrpc_buf_rrq_cache);
>> +			kmem_cache_destroy(rrpc_wb_cache);
>> +			up_write(&rrpc_lock);
>> +			return -ENOMEM;
>> +		}
>> 	}
>> 	up_write(&rrpc_lock);
>> 
>> @@ -1113,17 +1629,45 @@ static int rrpc_core_init(struct rrpc *rrpc)
>> 	if (!rrpc->rq_pool)
>> 		return -ENOMEM;
>> 
>> +	rrpc->rrq_pool = mempool_create_slab_pool(64, rrpc_rrq_cache);
>> +	if (!rrpc->rrq_pool)
>> +		return -ENOMEM;
>> +
>> +	rrpc->m_rrq_pool = mempool_create_slab_pool(64, rrpc_buf_rrq_cache);
>> +	if (!rrpc->m_rrq_pool)
>> +		return -ENOMEM;
>> +
>> +	rrpc->block_pool = mempool_create_slab_pool(8, rrpc_block_cache);
>> +	if (!rrpc->block_pool)
>> +		return -ENOMEM;
>> +
>> +	rrpc->write_buf_pool = mempool_create_slab_pool(8, rrpc_wb_cache);
>> +	if (!rrpc->write_buf_pool)
>> +		return -ENOMEM;
> 
> Alot of pools. I hope the nvm_rq, rrpc_rq, and rrpc_buf_rq consolidation
> can move it back to just a nvm_rq entry)
> 
>> +
>> 	spin_lock_init(&rrpc->inflights.lock);
>> 	INIT_LIST_HEAD(&rrpc->inflights.reqs);
>> 
>> +	rrpc->kw_wq = alloc_workqueue("rrpc-writer",
>> +				WQ_MEM_RECLAIM | WQ_UNBOUND, rrpc->nr_luns);
>> +	if (!rrpc->kw_wq)
>> +		return -ENOMEM;
>> +
>> 	return 0;
>> }
>> 
>> static void rrpc_core_free(struct rrpc *rrpc)
>> {
>> +	if (rrpc->kw_wq)
>> +		destroy_workqueue(rrpc->kw_wq);
>> +
>> 	mempool_destroy(rrpc->page_pool);
>> 	mempool_destroy(rrpc->gcb_pool);
>> +	mempool_destroy(rrpc->m_rrq_pool);
>> +	mempool_destroy(rrpc->rrq_pool);
>> 	mempool_destroy(rrpc->rq_pool);
>> +	mempool_destroy(rrpc->block_pool);
>> +	mempool_destroy(rrpc->write_buf_pool);
>> }
>> 
>> static void rrpc_luns_free(struct rrpc *rrpc)
>> @@ -1164,6 +1708,8 @@ static int rrpc_luns_init(struct rrpc *rrpc, int lun_begin, int lun_end)
>> 		INIT_LIST_HEAD(&rlun->open_list);
>> 		INIT_LIST_HEAD(&rlun->closed_list);
>> 
>> +		INIT_WORK(&rlun->ws_writer, rrpc_submit_write);
>> +
>> 		INIT_WORK(&rlun->ws_gc, rrpc_lun_gc);
>> 		spin_lock_init(&rlun->lock);
>> 
>> @@ -1209,6 +1755,7 @@ static void rrpc_exit(void *private)
>> 
>> 	flush_workqueue(rrpc->krqd_wq);
>> 	flush_workqueue(rrpc->kgc_wq);
>> +	/* flush_workqueue(rrpc->kw_wq); */ /* TODO: Implement flush + padding*/
> 
> This can come in an additional patch.
> 
>> rrpc_free(rrpc);
>> }
>> diff --git a/drivers/lightnvm/rrpc.h b/drivers/lightnvm/rrpc.h
>> index 868e91a..6e188b4 100644
>> --- a/drivers/lightnvm/rrpc.h
>> +++ b/drivers/lightnvm/rrpc.h
>> @@ -49,17 +49,67 @@ struct rrpc_inflight_rq {
>> struct rrpc_rq {
>> 	struct rrpc_inflight_rq inflight_rq;
>> 	struct rrpc_addr *addr;
>> +	int nr_pages;
>> +
>> +	struct kref refs;
> 
> I think what you really want to do here is to add the ref counter to
> rrpc_inflight_addr and then have that maintain it to when to free.
> (also, name it kref)
> 
>> +};
>> +
>> +struct rrpc_buf_rq {
>> +	struct rrpc_addr *addr;
>> +	struct rrpc_rq *rrqd;
> 
> I'm not sure why we keep a rrpc_buf_rq->addr and rrpc_rq->add?

The reason is that in the multipage path rrqd is shared among several
ppas (the ones sent in the same bio); rrqd->addr points to the first
addr. Note that the rrqd field in the buffer structure points to the
rrqd formed in the original bio - thus the use of kref. In
rrpc_buf_rq->addr we store the actual addr corresponding to the ppa,
which we use in the write thread. I think this will be much cleaner when
rrqd is split for reads and writes.


>> +};
>> +
>> +/* Sync strategies from write buffer to media */
>> +enum {
>> +	NVM_SYNC_SOFT	= 0x0,		/* Only submit at max_write_pgs
>> +					 * supported by the device. Typically 64
>> +					 * pages (256k).
>> +					 */
>> +	NVM_SYNC_HARD	= 0x1,		/* Submit the whole buffer. Add padding
>> +					 * if necessary to respect the device's
>> +					 * min_write_pgs.
>> +					 */
>> +	NVM_SYNC_OPORT	= 0x2,		/* Submit what we can, always respecting
>> +					 * the device's min_write_pgs.
>> +					 */
> 
> That is great. This should properly be exposed as a sysfs option later.

Sure. Good idea!

>> +};
>> +
>> +struct buf_entry {
>> +	struct rrpc_rq *rrqd;
>> +	void *data;
>> +	struct rrpc_addr *addr;
>> 	unsigned long flags;
>> };
>> 
>> +struct rrpc_w_buf {
>> +	struct buf_entry *entries;	/* Entries */
>> +	struct buf_entry *mem;		/* Points to the next writable entry */
>> +	struct buf_entry *subm;		/* Points to the last submitted entry */
>> +	int cur_mem;			/* Current memory entry. Follows mem */
>> +	int cur_subm;			/* Entries have been submitted to dev */
>> +	int nentries;			/* Number of entries in write buffer */
> 
> nr_entries?

:)

> 
>> +
>> +	void *data;			/* Actual data */
>> +	unsigned long *sync_bitmap;	/* Bitmap representing physical
>> +					 * addresses that have been synced to
>> +					 * the media
>> +					 */
>> +
>> +	atomic_t refs;
> 
> kref? semaphore?

Yes.

> 
>> +
>> +	spinlock_t w_lock;
>> +	spinlock_t s_lock;
> 
> is it short for submission lock?

It is short for sync_lock.

> 
>> +};
>> +
>> struct rrpc_block {
>> 	struct nvm_block *parent;
>> 	struct rrpc_lun *rlun;
>> 	struct list_head prio;
>> 	struct list_head list;
>> +	struct rrpc_w_buf w_buf;
>> 
>> #define MAX_INVALID_PAGES_STORAGE 8
>> -	/* Bitmap for invalid page intries */
>> +	/* Bitmap for invalid page entries */
>> 	unsigned long invalid_pages[MAX_INVALID_PAGES_STORAGE];
>> 	/* points to the next writable page within a block */
>> 	unsigned int next_page;
>> @@ -67,7 +117,6 @@ struct rrpc_block {
>> 	unsigned int nr_invalid_pages;
>> 
>> 	spinlock_t lock;
>> -	atomic_t data_cmnt_size; /* data pages committed to stable storage */
>> };
>> 
>> struct rrpc_lun {
>> @@ -86,6 +135,7 @@ struct rrpc_lun {
>> 					 */
>> 
>> 	struct work_struct ws_gc;
>> +	struct work_struct ws_writer;
>> 
>> 	spinlock_t lock;
>> };
>> @@ -136,10 +186,15 @@ struct rrpc {
>> 	mempool_t *page_pool;
>> 	mempool_t *gcb_pool;
>> 	mempool_t *rq_pool;
>> +	mempool_t *rrq_pool;
>> +	mempool_t *m_rrq_pool;
>> +	mempool_t *block_pool;
>> +	mempool_t *write_buf_pool;
>> 
>> 	struct timer_list gc_timer;
>> 	struct workqueue_struct *krqd_wq;
>> 	struct workqueue_struct *kgc_wq;
>> +	struct workqueue_struct *kw_wq;
>> };
>> 
>> struct rrpc_block_gc {
>> @@ -181,7 +236,7 @@ static inline int request_intersects(struct rrpc_inflight_rq *r,
>> }
>> 
>> static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
>> -			     unsigned pages, struct rrpc_inflight_rq *r)
>> +				unsigned pages, struct rrpc_inflight_rq *r)
>> {
>> 	sector_t laddr_end = laddr + pages - 1;
>> 	struct rrpc_inflight_rq *rtmp;
>> @@ -206,27 +261,26 @@ static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
>> }
>> 
>> static inline int rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
>> -				 unsigned pages,
>> -				 struct rrpc_inflight_rq *r)
>> +				unsigned pages,
>> +				struct rrpc_inflight_rq *r)
>> {
>> 	BUG_ON((laddr + pages) > rrpc->nr_sects);
>> 
>> 	return __rrpc_lock_laddr(rrpc, laddr, pages, r);
>> }
>> 
>> -static inline struct rrpc_inflight_rq *rrpc_get_inflight_rq(struct nvm_rq *rqd)
>> +static inline struct rrpc_inflight_rq
>> +				*rrpc_get_inflight_rq(struct rrpc_rq *rrqd)
>> {
>> -	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
>> -
>> 	return &rrqd->inflight_rq;
>> }
>> 
>> static inline int rrpc_lock_rq(struct rrpc *rrpc, struct bio *bio,
>> -							struct nvm_rq *rqd)
>> +							struct rrpc_rq *rrqd)
>> {
>> 	sector_t laddr = rrpc_get_laddr(bio);
>> 	unsigned int pages = rrpc_get_pages(bio);
>> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
>> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
>> 
>> 	return rrpc_lock_laddr(rrpc, laddr, pages, r);
>> }
>> @@ -241,12 +295,12 @@ static inline void rrpc_unlock_laddr(struct rrpc *rrpc,
>> 	spin_unlock_irqrestore(&rrpc->inflights.lock, flags);
>> }
>> 
>> -static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct nvm_rq *rqd)
>> +static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct rrpc_rq *rrqd)
>> {
>> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
>> -	uint8_t pages = rqd->nr_pages;
>> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
>> +	uint8_t nr_pages = rrqd->nr_pages;
> 
> The cleanups can be moved to another patch.
Ok.

> 
>> -	BUG_ON((r->l_start + pages) > rrpc->nr_sects);
>> +	BUG_ON((r->l_start + nr_pages) > rrpc->nr_sects);
>> 
>> 	rrpc_unlock_laddr(rrpc, r);
>> }
>> diff --git a/include/linux/lightnvm.h b/include/linux/lightnvm.h
>> index b94f2d5..eda9743 100644
>> --- a/include/linux/lightnvm.h
>> +++ b/include/linux/lightnvm.h
>> @@ -231,6 +231,8 @@ struct nvm_rq {
>> 	void *metadata;
>> 	dma_addr_t dma_metadata;
>> 
>> +	void *priv;
>> +
>> 	struct completion *wait;
>> 	nvm_end_io_fn *end_io;
>> 
>> @@ -243,12 +245,12 @@ struct nvm_rq {
>> 
>> static inline struct nvm_rq *nvm_rq_from_pdu(void *pdu)
>> {
>> -	return pdu - sizeof(struct nvm_rq);
>> +	return container_of(pdu, struct nvm_rq, priv);
>> }
>> 
>> static inline void *nvm_rq_to_pdu(struct nvm_rq *rqdata)
>> {
>> -	return rqdata + 1;
>> +	return rqdata->priv;
>> }
> 
> I think the priv is unnecessary. This should be able to be expressed in
> a single structure. E.g. have the rrpc_rq be a union of the data types
> you either need when processing a read or a write. It seems that they no
> longer have much in common.

Yes, I’ll try to come back to the original single pool allocation,
probably differentiating between reads and writes, since as mentioned,
the structure will be different.


Thanks,
Javier.

Patch
diff mbox

diff --git a/drivers/lightnvm/rrpc.c b/drivers/lightnvm/rrpc.c
index 8187bf3..e9fb19d 100644
--- a/drivers/lightnvm/rrpc.c
+++ b/drivers/lightnvm/rrpc.c
@@ -16,11 +16,12 @@ 
 
 #include "rrpc.h"
 
-static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache;
+static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache, *rrpc_rrq_cache,
+			*rrpc_buf_rrq_cache, *rrpc_wb_cache, *rrpc_block_cache;
 static DECLARE_RWSEM(rrpc_lock);
 
 static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
-				struct nvm_rq *rqd, unsigned long flags);
+				struct rrpc_rq *rqd, unsigned long flags);
 
 #define rrpc_for_each_lun(rrpc, rlun, i) \
 		for ((i) = 0, rlun = &(rrpc)->luns[0]; \
@@ -62,53 +63,59 @@  static void rrpc_invalidate_range(struct rrpc *rrpc, sector_t slba,
 	spin_unlock(&rrpc->rev_lock);
 }
 
-static struct nvm_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
+static void rrpc_release_and_free_rrqd(struct kref *ref)
+{
+	struct rrpc_rq *rrqd = container_of(ref, struct rrpc_rq, refs);
+	struct rrpc *rrpc = rrqd->addr->rblk->rlun->rrpc;
+
+	rrpc_unlock_rq(rrpc, rrqd);
+	mempool_free(rrqd, rrpc->rrq_pool);
+}
+
+static struct rrpc_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
 					sector_t laddr, unsigned int pages)
 {
-	struct nvm_rq *rqd;
+	struct rrpc_rq *rrqd;
 	struct rrpc_inflight_rq *inf;
 
-	rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
-	if (!rqd)
+	rrqd = mempool_alloc(rrpc->rrq_pool, GFP_ATOMIC);
+	if (!rrqd)
 		return ERR_PTR(-ENOMEM);
+	kref_init(&rrqd->refs);
 
-	inf = rrpc_get_inflight_rq(rqd);
+	inf = rrpc_get_inflight_rq(rrqd);
 	if (rrpc_lock_laddr(rrpc, laddr, pages, inf)) {
-		mempool_free(rqd, rrpc->rq_pool);
+		mempool_free(rrqd, rrpc->rrq_pool);
 		return NULL;
 	}
 
-	return rqd;
+	return rrqd;
 }
 
-static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct nvm_rq *rqd)
+static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct rrpc_rq *rrqd)
 {
-	struct rrpc_inflight_rq *inf = rrpc_get_inflight_rq(rqd);
-
-	rrpc_unlock_laddr(rrpc, inf);
-
-	mempool_free(rqd, rrpc->rq_pool);
+	kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
 }
 
 static void rrpc_discard(struct rrpc *rrpc, struct bio *bio)
 {
 	sector_t slba = bio->bi_iter.bi_sector / NR_PHY_IN_LOG;
 	sector_t len = bio->bi_iter.bi_size / RRPC_EXPOSED_PAGE_SIZE;
-	struct nvm_rq *rqd;
+	struct rrpc_rq *rrqd;
 
 	do {
-		rqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
+		rrqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
 		schedule();
-	} while (!rqd);
+	} while (!rrqd);
 
-	if (IS_ERR(rqd)) {
+	if (IS_ERR(rrqd)) {
 		pr_err("rrpc: unable to acquire inflight IO\n");
 		bio_io_error(bio);
 		return;
 	}
 
 	rrpc_invalidate_range(rrpc, slba, len);
-	rrpc_inflight_laddr_release(rrpc, rqd);
+	rrpc_inflight_laddr_release(rrpc, rrqd);
 }
 
 static int block_is_full(struct rrpc *rrpc, struct rrpc_block *rblk)
@@ -166,8 +173,6 @@  static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
 {
 	struct rrpc *rrpc = rlun->rrpc;
 
-	BUG_ON(!rblk);
-
 	if (rlun->cur) {
 		spin_lock(&rlun->cur->lock);
 		WARN_ON(!block_is_full(rrpc, rlun->cur));
@@ -176,12 +181,107 @@  static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
 	rlun->cur = rblk;
 }
 
+static void rrpc_free_w_buffer(struct rrpc *rrpc, struct rrpc_block *rblk)
+{
+try:
+	spin_lock(&rblk->w_buf.s_lock);
+	if (atomic_read(&rblk->w_buf.refs) > 0) {
+		spin_unlock(&rblk->w_buf.s_lock);
+		schedule();
+		goto try;
+	}
+
+	mempool_free(rblk->w_buf.entries, rrpc->block_pool);
+	rblk->w_buf.entries = NULL;
+	spin_unlock(&rblk->w_buf.s_lock);
+
+	/* TODO: Reuse the same buffers if the block size is the same */
+	mempool_free(rblk->w_buf.data, rrpc->write_buf_pool);
+	kfree(rblk->w_buf.sync_bitmap);
+
+	rblk->w_buf.mem = NULL;
+	rblk->w_buf.subm = NULL;
+	rblk->w_buf.sync_bitmap = NULL;
+	rblk->w_buf.data = NULL;
+	rblk->w_buf.nentries = 0;
+	rblk->w_buf.cur_mem = 0;
+	rblk->w_buf.cur_subm = 0;
+}
+
+static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
+{
+	struct rrpc_lun *rlun = rblk->rlun;
+	struct nvm_lun *lun = rlun->parent;
+	struct rrpc_w_buf *buf = &rblk->w_buf;
+
+try:
+	spin_lock(&buf->w_lock);
+	/* Flush inflight I/Os */
+	if (!bitmap_full(buf->sync_bitmap, buf->cur_mem)) {
+		spin_unlock(&buf->w_lock);
+		schedule();
+		goto try;
+	}
+	spin_unlock(&buf->w_lock);
+
+	if (rblk->w_buf.entries)
+		rrpc_free_w_buffer(rrpc, rblk);
+
+	spin_lock(&lun->lock);
+	nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
+	list_del(&rblk->list);
+	spin_unlock(&lun->lock);
+}
+
+static void rrpc_put_blks(struct rrpc *rrpc)
+{
+	struct rrpc_lun *rlun;
+	int i;
+
+	for (i = 0; i < rrpc->nr_luns; i++) {
+		rlun = &rrpc->luns[i];
+		if (rlun->cur)
+			rrpc_put_blk(rrpc, rlun->cur);
+		if (rlun->gc_cur)
+			rrpc_put_blk(rrpc, rlun->gc_cur);
+	}
+}
+
 static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
 							unsigned long flags)
 {
+	struct nvm_dev *dev = rrpc->dev;
 	struct nvm_lun *lun = rlun->parent;
 	struct nvm_block *blk;
 	struct rrpc_block *rblk;
+	struct buf_entry *entries;
+	unsigned long *sync_bitmap;
+	void *data;
+	int nentries = dev->pgs_per_blk * dev->sec_per_pg;
+
+	data = mempool_alloc(rrpc->write_buf_pool, GFP_ATOMIC);
+	if (!data) {
+		pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
+		return NULL;
+	}
+
+	entries = mempool_alloc(rrpc->block_pool, GFP_ATOMIC);
+	if (!entries) {
+		pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
+		mempool_free(data, rrpc->write_buf_pool);
+		return NULL;
+	}
+
+	/* TODO: Mempool? */
+	sync_bitmap = kmalloc(BITS_TO_LONGS(nentries) *
+					sizeof(unsigned long), GFP_ATOMIC);
+	if (!sync_bitmap) {
+		mempool_free(data, rrpc->write_buf_pool);
+		mempool_free(entries, rrpc->block_pool);
+		return NULL;
+	}
+
+	bitmap_zero(sync_bitmap, nentries);
 
 	spin_lock(&lun->lock);
 	blk = nvm_get_blk_unlocked(rrpc->dev, rlun->parent, flags);
@@ -192,43 +292,34 @@  static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
 	}
 
 	rblk = &rlun->blocks[blk->id];
-	list_add_tail(&rblk->list, &rlun->open_list);
-	spin_unlock(&lun->lock);
 
 	blk->priv = rblk;
-	bitmap_zero(rblk->invalid_pages, rrpc->dev->pgs_per_blk);
+	bitmap_zero(rblk->invalid_pages, dev->pgs_per_blk);
 	rblk->next_page = 0;
 	rblk->nr_invalid_pages = 0;
-	atomic_set(&rblk->data_cmnt_size, 0);
+
+	rblk->w_buf.data = data;
+	rblk->w_buf.entries = entries;
+	rblk->w_buf.sync_bitmap = sync_bitmap;
+
+	rblk->w_buf.entries->data  = rblk->w_buf.data;
+	rblk->w_buf.mem = rblk->w_buf.entries;
+	rblk->w_buf.subm = rblk->w_buf.entries;
+	rblk->w_buf.nentries = nentries;
+	rblk->w_buf.cur_mem = 0;
+	rblk->w_buf.cur_subm = 0;
+
+	atomic_set(&rblk->w_buf.refs, 0);
+
+	spin_lock_init(&rblk->w_buf.w_lock);
+	spin_lock_init(&rblk->w_buf.s_lock);
+
+	list_add_tail(&rblk->list, &rlun->open_list);
+	spin_unlock(&lun->lock);
 
 	return rblk;
 }
 
-static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
-{
-	struct rrpc_lun *rlun = rblk->rlun;
-	struct nvm_lun *lun = rlun->parent;
-
-	spin_lock(&lun->lock);
-	nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
-	list_del(&rblk->list);
-	spin_unlock(&lun->lock);
-}
-
-static void rrpc_put_blks(struct rrpc *rrpc)
-{
-	struct rrpc_lun *rlun;
-	int i;
-
-	for (i = 0; i < rrpc->nr_luns; i++) {
-		rlun = &rrpc->luns[i];
-		if (rlun->cur)
-			rrpc_put_blk(rrpc, rlun->cur);
-		if (rlun->gc_cur)
-			rrpc_put_blk(rrpc, rlun->gc_cur);
-	}
-}
-
 static struct rrpc_lun *get_next_lun(struct rrpc *rrpc)
 {
 	int next = atomic_inc_return(&rrpc->next_lun);
@@ -247,6 +338,17 @@  static void rrpc_gc_kick(struct rrpc *rrpc)
 	}
 }
 
+static void rrpc_writer_kick(struct rrpc *rrpc)
+{
+	struct rrpc_lun *rlun;
+	unsigned int i;
+
+	for (i = 0; i < rrpc->nr_luns; i++) {
+		rlun = &rrpc->luns[i];
+		queue_work(rrpc->kw_wq, &rlun->ws_writer);
+	}
+}
+
 /*
  * timed GC every interval.
  */
@@ -282,7 +384,7 @@  static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
 {
 	struct request_queue *q = rrpc->dev->q;
 	struct rrpc_rev_addr *rev;
-	struct nvm_rq *rqd;
+	struct rrpc_rq *rrqd;
 	struct bio *bio;
 	struct page *page;
 	int slot;
@@ -306,7 +408,7 @@  static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
 	}
 
 	while ((slot = find_first_zero_bit(rblk->invalid_pages,
-					    nr_pgs_per_blk)) < nr_pgs_per_blk) {
+					nr_pgs_per_blk)) < nr_pgs_per_blk) {
 
 		/* Lock laddr */
 		phys_addr = (rblk->parent->id * nr_pgs_per_blk) + slot;
@@ -321,8 +423,8 @@  try:
 			continue;
 		}
 
-		rqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
-		if (IS_ERR_OR_NULL(rqd)) {
+		rrqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
+		if (IS_ERR_OR_NULL(rrqd)) {
 			spin_unlock(&rrpc->rev_lock);
 			schedule();
 			goto try;
@@ -339,14 +441,14 @@  try:
 		/* TODO: may fail when EXP_PG_SIZE > PAGE_SIZE */
 		bio_add_pc_page(q, bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
 
-		if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
+		if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)) {
 			pr_err("rrpc: gc read failed.\n");
-			rrpc_inflight_laddr_release(rrpc, rqd);
+			rrpc_inflight_laddr_release(rrpc, rrqd);
 			goto finished;
 		}
 		wait_for_completion_io(&wait);
 		if (bio->bi_error) {
-			rrpc_inflight_laddr_release(rrpc, rqd);
+			rrpc_inflight_laddr_release(rrpc, rrqd);
 			goto finished;
 		}
 
@@ -363,14 +465,16 @@  try:
 		/* turn the command around and write the data back to a new
 		 * address
 		 */
-		if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
+		if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)
+							!= NVM_IO_DONE) {
 			pr_err("rrpc: gc write failed.\n");
-			rrpc_inflight_laddr_release(rrpc, rqd);
+			rrpc_inflight_laddr_release(rrpc, rrqd);
 			goto finished;
 		}
+		bio_endio(bio);
 		wait_for_completion_io(&wait);
 
-		rrpc_inflight_laddr_release(rrpc, rqd);
+		/* rrpc_inflight_laddr_release(rrpc, rrqd); */
 		if (bio->bi_error)
 			goto finished;
 
@@ -514,6 +618,8 @@  static void rrpc_gc_queue(struct work_struct *work)
 	list_move_tail(&rblk->list, &rlun->closed_list);
 	spin_unlock(&lun->lock);
 
+	rrpc_free_w_buffer(rrpc, rblk);
+
 	mempool_free(gcb, rrpc->gcb_pool);
 	pr_debug("nvm: block '%lu' is full, allow GC (sched)\n",
 							rblk->parent->id);
@@ -663,43 +769,72 @@  static void rrpc_run_gc(struct rrpc *rrpc, struct rrpc_block *rblk)
 	queue_work(rrpc->kgc_wq, &gcb->ws_gc);
 }
 
-static void rrpc_end_io_write(struct rrpc *rrpc, struct rrpc_rq *rrqd,
-						sector_t laddr, uint8_t npages)
+static void rrpc_sync_buffer(struct rrpc *rrpc, struct rrpc_addr *p)
 {
-	struct rrpc_addr *p;
 	struct rrpc_block *rblk;
+	struct rrpc_w_buf *buf;
 	struct nvm_lun *lun;
-	int cmnt_size, i;
+	unsigned long bppa;
 
-	for (i = 0; i < npages; i++) {
-		p = &rrpc->trans_map[laddr + i];
-		rblk = p->rblk;
-		lun = rblk->parent->lun;
+	rblk = p->rblk;
+	buf = &rblk->w_buf;
+	lun = rblk->parent->lun;
 
-		cmnt_size = atomic_inc_return(&rblk->data_cmnt_size);
-		if (unlikely(cmnt_size == rrpc->dev->pgs_per_blk))
-			rrpc_run_gc(rrpc, rblk);
+	bppa = rrpc->dev->sec_per_blk * rblk->parent->id;
+
+	WARN_ON(test_and_set_bit((p->addr - bppa), buf->sync_bitmap));
+
+	if (unlikely(bitmap_full(buf->sync_bitmap, buf->nentries))) {
+		/* Write buffer out-of-bounds */
+		WARN_ON((buf->cur_mem != buf->nentries) &&
+					(buf->cur_mem != buf->cur_subm));
+
+		rrpc_run_gc(rrpc, rblk);
+	}
+}
+
+static void rrpc_end_io_write(struct rrpc *rrpc, struct nvm_rq *rqd,
+							uint8_t nr_pages)
+{
+	struct rrpc_buf_rq *brrqd = nvm_rq_to_pdu(rqd);
+	struct rrpc_rq *rrqd;
+	int i;
+
+	for (i = 0; i < nr_pages; i++) {
+		rrqd = brrqd[i].rrqd;
+		rrpc_sync_buffer(rrpc, brrqd[i].addr);
+		kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
 	}
+
+	mempool_free(brrqd, rrpc->m_rrq_pool);
+	rrpc_writer_kick(rrpc);
+}
+
+static void rrpc_end_io_read(struct rrpc *rrpc, struct nvm_rq *rqd,
+							uint8_t nr_pages)
+{
+	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
+
+	if (rqd->flags & NVM_IOTYPE_GC)
+		return;
+
+	rrpc_unlock_rq(rrpc, rrqd);
+	mempool_free(rrqd, rrpc->rrq_pool);
 }
 
 static void rrpc_end_io(struct nvm_rq *rqd)
 {
 	struct rrpc *rrpc = container_of(rqd->ins, struct rrpc, instance);
-	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
-	uint8_t npages = rqd->nr_pages;
-	sector_t laddr = rrpc_get_laddr(rqd->bio) - npages;
+	uint8_t nr_pages = rqd->nr_pages;
 
 	if (bio_data_dir(rqd->bio) == WRITE)
-		rrpc_end_io_write(rrpc, rrqd, laddr, npages);
+		rrpc_end_io_write(rrpc, rqd, nr_pages);
+	else
+		rrpc_end_io_read(rrpc, rqd, nr_pages);
 
 	bio_put(rqd->bio);
 
-	if (rrqd->flags & NVM_IOTYPE_GC)
-		return;
-
-	rrpc_unlock_rq(rrpc, rqd);
-
-	if (npages > 1)
+	if (nr_pages > 1)
 		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
 	if (rqd->metadata)
 		nvm_dev_dma_free(rrpc->dev, rqd->metadata, rqd->dma_metadata);
@@ -708,20 +843,24 @@  static void rrpc_end_io(struct nvm_rq *rqd)
 }
 
 static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
-			struct nvm_rq *rqd, unsigned long flags, int npages)
+			struct nvm_rq *rqd, struct rrpc_buf_rq *brrqd,
+			unsigned long flags, int nr_pages)
 {
-	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
+	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
+	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
 	struct rrpc_addr *gp;
 	sector_t laddr = rrpc_get_laddr(bio);
 	int is_gc = flags & NVM_IOTYPE_GC;
 	int i;
 
-	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
+	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
 		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
+		mempool_free(rrqd, rrpc->rrq_pool);
+		mempool_free(rqd, rrpc->rq_pool);
 		return NVM_IO_REQUEUE;
 	}
 
-	for (i = 0; i < npages; i++) {
+	for (i = 0; i < nr_pages; i++) {
 		/* We assume that mapping occurs at 4KB granularity */
 		BUG_ON(!(laddr + i >= 0 && laddr + i < rrpc->nr_sects));
 		gp = &rrpc->trans_map[laddr + i];
@@ -734,8 +873,12 @@  static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
 			rrpc_unlock_laddr(rrpc, r);
 			nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
 							rqd->dma_ppa_list);
+			mempool_free(rrqd, rrpc->rrq_pool);
+			mempool_free(rqd, rrpc->rq_pool);
 			return NVM_IO_DONE;
 		}
+
+		brrqd[i].addr = gp;
 	}
 
 	rqd->opcode = NVM_OP_HBREAD;
@@ -751,8 +894,11 @@  static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
 	sector_t laddr = rrpc_get_laddr(bio);
 	struct rrpc_addr *gp;
 
-	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
+	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
+		mempool_free(rrqd, rrpc->rrq_pool);
+		mempool_free(rqd, rrpc->rq_pool);
 		return NVM_IO_REQUEUE;
+	}
 
 	BUG_ON(!(laddr >= 0 && laddr < rrpc->nr_sects));
 	gp = &rrpc->trans_map[laddr];
@@ -761,7 +907,9 @@  static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
 		rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, gp->addr);
 	} else {
 		BUG_ON(is_gc);
-		rrpc_unlock_rq(rrpc, rqd);
+		rrpc_unlock_rq(rrpc, rrqd);
+		mempool_free(rrqd, rrpc->rrq_pool);
+		mempool_free(rqd, rrpc->rq_pool);
 		return NVM_IO_DONE;
 	}
 
@@ -771,120 +919,190 @@  static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
 	return NVM_IO_OK;
 }
 
+/*
+ * Copy data from current bio to block write buffer. This if necessary
+ * to guarantee durability if a flash block becomes bad before all pages
+ * are written. This buffer is also used to write at the right page
+ * granurality
+ */
+static int rrpc_write_to_buffer(struct rrpc *rrpc, struct bio *bio,
+				struct rrpc_rq *rrqd, struct rrpc_addr *addr,
+				struct rrpc_w_buf *w_buf,
+				unsigned long flags)
+{
+	struct nvm_dev *dev = rrpc->dev;
+	unsigned int bio_len = bio_cur_bytes(bio);
+
+	if (bio_len != RRPC_EXPOSED_PAGE_SIZE)
+		return NVM_IO_ERR;
+
+	spin_lock(&w_buf->w_lock);
+
+	WARN_ON(w_buf->cur_mem == w_buf->nentries);
+
+	w_buf->mem->rrqd = rrqd;
+	w_buf->mem->addr = addr;
+	w_buf->mem->flags = flags;
+
+	memcpy(w_buf->mem->data, bio_data(bio), bio_len);
+
+	w_buf->cur_mem++;
+	if (likely(w_buf->cur_mem < w_buf->nentries)) {
+		w_buf->mem++;
+		w_buf->mem->data =
+				w_buf->data + (w_buf->cur_mem * dev->sec_size);
+	}
+
+	spin_unlock(&w_buf->w_lock);
+
+	return 0;
+}
+
 static int rrpc_write_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
-			struct nvm_rq *rqd, unsigned long flags, int npages)
+			struct rrpc_rq *rrqd, unsigned long flags, int nr_pages)
 {
-	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
+	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
+	struct rrpc_w_buf *w_buf;
 	struct rrpc_addr *p;
+	struct rrpc_lun *rlun;
 	sector_t laddr = rrpc_get_laddr(bio);
 	int is_gc = flags & NVM_IOTYPE_GC;
+	int err;
 	int i;
 
-	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
-		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
+	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
+		mempool_free(rrqd, rrpc->rrq_pool);
 		return NVM_IO_REQUEUE;
 	}
 
-	for (i = 0; i < npages; i++) {
+	for (i = 0; i < nr_pages; i++) {
+		kref_get(&rrqd->refs);
+
 		/* We assume that mapping occurs at 4KB granularity */
 		p = rrpc_map_page(rrpc, laddr + i, is_gc);
 		if (!p) {
 			BUG_ON(is_gc);
 			rrpc_unlock_laddr(rrpc, r);
-			nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
-							rqd->dma_ppa_list);
+			mempool_free(rrqd, rrpc->rrq_pool);
 			rrpc_gc_kick(rrpc);
 			return NVM_IO_REQUEUE;
 		}
 
-		rqd->ppa_list[i] = rrpc_ppa_to_gaddr(rrpc->dev,
-								p->addr);
+		w_buf = &p->rblk->w_buf;
+		rlun = p->rblk->rlun;
+
+		rrqd->addr = p;
+
+		err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
+		if (err) {
+			pr_err("rrpc: could not write to write buffer\n");
+			return err;
+		}
+
+		bio_advance(bio, RRPC_EXPOSED_PAGE_SIZE);
+
+		queue_work(rrpc->kw_wq, &rlun->ws_writer);
 	}
 
-	rqd->opcode = NVM_OP_HBWRITE;
+	if (kref_put(&rrqd->refs, rrpc_release_and_free_rrqd)) {
+		pr_err("rrpc: request reference counter dailed\n");
+		return NVM_IO_ERR;
+	}
 
-	return NVM_IO_OK;
+	return NVM_IO_DONE;
 }
 
 static int rrpc_write_rq(struct rrpc *rrpc, struct bio *bio,
-				struct nvm_rq *rqd, unsigned long flags)
+				struct rrpc_rq *rrqd, unsigned long flags)
 {
-	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
+	struct rrpc_w_buf *w_buf;
 	struct rrpc_addr *p;
+	struct rrpc_lun *rlun;
 	int is_gc = flags & NVM_IOTYPE_GC;
+	int err;
 	sector_t laddr = rrpc_get_laddr(bio);
 
-	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
+	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
+		mempool_free(rrqd, rrpc->rrq_pool);
 		return NVM_IO_REQUEUE;
+	}
 
 	p = rrpc_map_page(rrpc, laddr, is_gc);
 	if (!p) {
 		BUG_ON(is_gc);
-		rrpc_unlock_rq(rrpc, rqd);
+		rrpc_unlock_rq(rrpc, rrqd);
+		mempool_free(rrqd, rrpc->rrq_pool);
 		rrpc_gc_kick(rrpc);
 		return NVM_IO_REQUEUE;
 	}
 
-	rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, p->addr);
-	rqd->opcode = NVM_OP_HBWRITE;
+	w_buf = &p->rblk->w_buf;
+	rlun = p->rblk->rlun;
+
 	rrqd->addr = p;
 
-	return NVM_IO_OK;
+	err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
+	if (err) {
+		pr_err("rrpc: could not write to write buffer\n");
+		return err;
+	}
+
+	queue_work(rrpc->kw_wq, &rlun->ws_writer);
+	return NVM_IO_DONE;
 }
 
-static int rrpc_setup_rq(struct rrpc *rrpc, struct bio *bio,
-			struct nvm_rq *rqd, unsigned long flags, uint8_t npages)
+static int rrpc_submit_read(struct rrpc *rrpc, struct bio *bio,
+				struct rrpc_rq *rrqd, unsigned long flags)
 {
-	if (npages > 1) {
+	struct nvm_rq *rqd;
+	struct rrpc_buf_rq brrqd[rrpc->max_write_pgs];
+	uint8_t nr_pages = rrpc_get_pages(bio);
+	int err;
+
+	rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
+	if (!rqd) {
+		pr_err_ratelimited("rrpc: not able to queue bio.");
+		bio_io_error(bio);
+		return NVM_IO_ERR;
+	}
+	rqd->metadata = NULL;
+	rqd->priv = rrqd;
+
+	if (nr_pages > 1) {
 		rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_KERNEL,
-							&rqd->dma_ppa_list);
+						&rqd->dma_ppa_list);
 		if (!rqd->ppa_list) {
 			pr_err("rrpc: not able to allocate ppa list\n");
+			mempool_free(rrqd, rrpc->rrq_pool);
+			mempool_free(rqd, rrpc->rq_pool);
 			return NVM_IO_ERR;
 		}
 
-		if (bio_rw(bio) == WRITE)
-			return rrpc_write_ppalist_rq(rrpc, bio, rqd, flags,
-									npages);
-
-		return rrpc_read_ppalist_rq(rrpc, bio, rqd, flags, npages);
+		err = rrpc_read_ppalist_rq(rrpc, bio, rqd, brrqd, flags,
+								nr_pages);
+		if (err) {
+			mempool_free(rrqd, rrpc->rrq_pool);
+			mempool_free(rqd, rrpc->rq_pool);
+			return err;
+		}
+	} else {
+		err = rrpc_read_rq(rrpc, bio, rqd, flags);
+		if (err)
+			return err;
 	}
 
-	if (bio_rw(bio) == WRITE)
-		return rrpc_write_rq(rrpc, bio, rqd, flags);
-
-	return rrpc_read_rq(rrpc, bio, rqd, flags);
-}
-
-static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
-				struct nvm_rq *rqd, unsigned long flags)
-{
-	int err;
-	struct rrpc_rq *rrq = nvm_rq_to_pdu(rqd);
-	uint8_t nr_pages = rrpc_get_pages(bio);
-	int bio_size = bio_sectors(bio) << 9;
-
-	if (bio_size < rrpc->dev->sec_size)
-		return NVM_IO_ERR;
-	else if (bio_size > rrpc->dev->max_rq_size)
-		return NVM_IO_ERR;
-
-	err = rrpc_setup_rq(rrpc, bio, rqd, flags, nr_pages);
-	if (err)
-		return err;
-
 	bio_get(bio);
 	rqd->bio = bio;
 	rqd->ins = &rrpc->instance;
-	rqd->nr_pages = nr_pages;
-	rrq->flags = flags;
+	rqd->nr_pages = rrqd->nr_pages = nr_pages;
+	rqd->flags = flags;
 
 	err = nvm_submit_io(rrpc->dev, rqd);
 	if (err) {
 		pr_err("rrpc: I/O submission failed: %d\n", err);
 		bio_put(bio);
 		if (!(flags & NVM_IOTYPE_GC)) {
-			rrpc_unlock_rq(rrpc, rqd);
+			rrpc_unlock_rq(rrpc, rrqd);
 			if (rqd->nr_pages > 1)
 				nvm_dev_dma_free(rrpc->dev,
 			rqd->ppa_list, rqd->dma_ppa_list);
@@ -895,10 +1113,39 @@  static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
 	return NVM_IO_OK;
 }
 
+static int rrpc_buffer_write(struct rrpc *rrpc, struct bio *bio,
+				struct rrpc_rq *rrqd, unsigned long flags)
+{
+	uint8_t nr_pages = rrpc_get_pages(bio);
+
+	rrqd->nr_pages = nr_pages;
+
+	if (nr_pages > 1)
+		return rrpc_write_ppalist_rq(rrpc, bio, rrqd, flags, nr_pages);
+	else
+		return rrpc_write_rq(rrpc, bio, rrqd, flags);
+}
+
+static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
+				struct rrpc_rq *rrqd, unsigned long flags)
+{
+	int bio_size = bio_sectors(bio) << 9;
+
+	if (bio_size < rrpc->dev->sec_size)
+		return NVM_IO_ERR;
+	else if (bio_size > rrpc->dev->max_rq_size)
+		return NVM_IO_ERR;
+
+	if (bio_rw(bio) == READ)
+		return rrpc_submit_read(rrpc, bio, rrqd, flags);
+
+	return rrpc_buffer_write(rrpc, bio, rrqd, flags);
+}
+
 static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
 {
 	struct rrpc *rrpc = q->queuedata;
-	struct nvm_rq *rqd;
+	struct rrpc_rq *rrqd;
 	int err;
 
 	if (bio->bi_rw & REQ_DISCARD) {
@@ -906,15 +1153,15 @@  static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
 		return BLK_QC_T_NONE;
 	}
 
-	rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
-	if (!rqd) {
-		pr_err_ratelimited("rrpc: not able to queue bio.");
+	rrqd = mempool_alloc(rrpc->rrq_pool, GFP_KERNEL);
+	if (!rrqd) {
+		pr_err_ratelimited("rrpc: not able to allocate rrqd.");
 		bio_io_error(bio);
 		return BLK_QC_T_NONE;
 	}
-	memset(rqd, 0, sizeof(struct nvm_rq));
+	kref_init(&rrqd->refs);
 
-	err = rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_NONE);
+	err = rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_NONE);
 	switch (err) {
 	case NVM_IO_OK:
 		return BLK_QC_T_NONE;
@@ -932,10 +1179,221 @@  static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
 		break;
 	}
 
-	mempool_free(rqd, rrpc->rq_pool);
 	return BLK_QC_T_NONE;
 }
 
+static int rrpc_alloc_page_in_bio(struct rrpc *rrpc, struct bio *bio,
+								void *data)
+{
+	struct page *page;
+	int err;
+
+	if (PAGE_SIZE != RRPC_EXPOSED_PAGE_SIZE)
+		return -1;
+
+	page = virt_to_page(data);
+	if (!page) {
+		pr_err("nvm: rrpc: could not alloc page\n");
+		return -1;
+	}
+
+	err = bio_add_page(bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
+	if (err != RRPC_EXPOSED_PAGE_SIZE) {
+		pr_err("nvm: rrpc: could not add page to bio\n");
+		return -1;
+	}
+
+	return 0;
+}
+
+static void rrpc_submit_write(struct work_struct *work)
+{
+	struct rrpc_lun *rlun = container_of(work, struct rrpc_lun, ws_writer);
+	struct rrpc *rrpc = rlun->rrpc;
+	struct nvm_dev *dev = rrpc->dev;
+	struct rrpc_addr *addr;
+	struct rrpc_rq *rrqd;
+	struct rrpc_buf_rq *brrqd;
+	void *data;
+	struct nvm_rq *rqd;
+	struct rrpc_block *rblk;
+	struct bio *bio;
+	int pgs_to_sync, pgs_avail;
+	int sync = NVM_SYNC_HARD;
+	int err;
+	int i;
+
+	/* Note that OS pages are typically mapped to flash page sectors, which
+	 * are 4K; a flash page might be formed of several sectors. Also,
+	 * controllers typically program flash pages across multiple planes.
+	 * This is the flash programing granurality, and the reason behind the
+	 * sync strategy performed in this write thread.
+	 */
+try:
+	spin_lock(&rlun->parent->lock);
+	list_for_each_entry(rblk, &rlun->open_list, list) {
+		if (!spin_trylock(&rblk->w_buf.w_lock))
+			continue;
+
+		/* If the write thread has already submitted all I/Os in the
+		 * write buffer for this block ignore that the block is in the
+		 * open list; it is on its way to the closed list. This enables
+		 * us to avoid taking a lock on the list.
+		 */
+		if (unlikely(rblk->w_buf.cur_subm == rblk->w_buf.nentries)) {
+			spin_unlock(&rblk->w_buf.w_lock);
+			spin_unlock(&rlun->parent->lock);
+			schedule();
+			goto try;
+		}
+		pgs_avail = rblk->w_buf.cur_mem - rblk->w_buf.cur_subm;
+
+		switch (sync) {
+		case NVM_SYNC_SOFT:
+			pgs_to_sync = (pgs_avail >= rrpc->max_write_pgs) ?
+					rrpc->max_write_pgs : 0;
+			break;
+		case NVM_SYNC_HARD:
+			if (pgs_avail >= rrpc->max_write_pgs)
+				pgs_to_sync = rrpc->max_write_pgs;
+			else if (pgs_avail >= rrpc->min_write_pgs)
+				pgs_to_sync = rrpc->min_write_pgs *
+					(pgs_avail / rrpc->min_write_pgs);
+			else
+				pgs_to_sync = pgs_avail; /* TODO: ADD PADDING */
+			break;
+		case NVM_SYNC_OPORT:
+			if (pgs_avail >= rrpc->max_write_pgs)
+				pgs_to_sync = rrpc->max_write_pgs;
+			else if (pgs_avail >= rrpc->min_write_pgs)
+				pgs_to_sync = rrpc->min_write_pgs *
+					(pgs_avail / rrpc->min_write_pgs);
+			else
+				pgs_to_sync = 0;
+		}
+
+		if (pgs_to_sync == 0) {
+			spin_unlock(&rblk->w_buf.w_lock);
+			continue;
+		}
+
+		bio = bio_alloc(GFP_ATOMIC, pgs_to_sync);
+		if (!bio) {
+			pr_err("nvm: rrpc: could not alloc write bio\n");
+			goto out1;
+		}
+
+		rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
+		if (!rqd) {
+			pr_err_ratelimited("rrpc: not able to create w req.");
+			goto out2;
+		}
+		rqd->metadata = NULL;
+
+		brrqd = mempool_alloc(rrpc->m_rrq_pool, GFP_ATOMIC);
+		if (!brrqd) {
+			pr_err_ratelimited("rrpc: not able to create w rea.");
+			goto out3;
+		}
+
+		bio->bi_iter.bi_sector = 0; /* artificial bio */
+		bio->bi_rw = WRITE;
+
+		rqd->opcode = NVM_OP_HBWRITE;
+		rqd->bio = bio;
+		rqd->ins = &rrpc->instance;
+		rqd->nr_pages = pgs_to_sync;
+		rqd->priv = brrqd;
+
+		if (pgs_to_sync == 1) {
+			rrqd = rblk->w_buf.subm->rrqd;
+			addr = rblk->w_buf.subm->addr;
+			rqd->flags = rblk->w_buf.subm->flags;
+			data = rblk->w_buf.subm->data;
+
+			err = rrpc_alloc_page_in_bio(rrpc, bio, data);
+			if (err) {
+				pr_err("rrpc: cannot allocate page in bio\n");
+				goto out4;
+			}
+
+			/* TODO: This address can be skipped */
+			if (addr->addr == ADDR_EMPTY)
+				pr_err_ratelimited("rrpc: submitting empty rq");
+
+			rqd->ppa_addr = rrpc_ppa_to_gaddr(dev, addr->addr);
+
+			brrqd[0].rrqd = rrqd;
+			brrqd[0].addr = addr;
+
+			rblk->w_buf.subm++;
+			rblk->w_buf.cur_subm++;
+
+			goto submit_io;
+		}
+
+		/* This bio will contain several pppas */
+		rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_ATOMIC,
+							&rqd->dma_ppa_list);
+		if (!rqd->ppa_list) {
+			pr_err("rrpc: not able to allocate ppa list\n");
+			goto out4;
+		}
+
+		for (i = 0; i < pgs_to_sync; i++) {
+			rrqd = rblk->w_buf.subm->rrqd;
+			addr = rblk->w_buf.subm->addr;
+			rqd->flags = rblk->w_buf.subm->flags;
+			data = rblk->w_buf.subm->data;
+
+			err = rrpc_alloc_page_in_bio(rrpc, bio, data);
+			if (err) {
+				pr_err("rrpc: cannot allocate page in bio\n");
+				goto out5;
+			}
+
+			/* TODO: This address can be skipped */
+			if (addr->addr == ADDR_EMPTY)
+				pr_err_ratelimited("rrpc: submitting empty rq");
+
+			rqd->ppa_list[i] = rrpc_ppa_to_gaddr(dev, addr->addr);
+
+			brrqd[i].rrqd = rrqd;
+			brrqd[i].addr = addr;
+
+			rblk->w_buf.subm++;
+			rblk->w_buf.cur_subm++;
+		}
+
+submit_io:
+		WARN_ON(rblk->w_buf.cur_subm > rblk->w_buf.nentries);
+
+		spin_unlock(&rblk->w_buf.w_lock);
+
+		err = nvm_submit_io(dev, rqd);
+		if (err) {
+			pr_err("rrpc: I/O submission failed: %d\n", err);
+			mempool_free(rqd, rrpc->rq_pool);
+			bio_put(bio);
+		}
+	}
+
+	spin_unlock(&rlun->parent->lock);
+	return;
+
+out5:
+	nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
+out4:
+	mempool_free(brrqd, rrpc->m_rrq_pool);
+out3:
+	mempool_free(rqd, rrpc->rq_pool);
+out2:
+	bio_put(bio);
+out1:
+	spin_unlock(&rblk->w_buf.w_lock);
+	spin_unlock(&rlun->parent->lock);
+}
+
 static void rrpc_requeue(struct work_struct *work)
 {
 	struct rrpc *rrpc = container_of(work, struct rrpc, ws_requeue);
@@ -978,7 +1436,7 @@  static void rrpc_gc_free(struct rrpc *rrpc)
 
 static int rrpc_gc_init(struct rrpc *rrpc)
 {
-	rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM|WQ_UNBOUND,
+	rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM | WQ_UNBOUND,
 								rrpc->nr_luns);
 	if (!rrpc->krqd_wq)
 		return -ENOMEM;
@@ -1080,6 +1538,8 @@  static int rrpc_map_init(struct rrpc *rrpc)
 
 static int rrpc_core_init(struct rrpc *rrpc)
 {
+	struct nvm_dev *dev = rrpc->dev;
+
 	down_write(&rrpc_lock);
 	if (!rrpc_gcb_cache) {
 		rrpc_gcb_cache = kmem_cache_create("rrpc_gcb",
@@ -1089,14 +1549,70 @@  static int rrpc_core_init(struct rrpc *rrpc)
 			return -ENOMEM;
 		}
 
-		rrpc_rq_cache = kmem_cache_create("rrpc_rq",
-				sizeof(struct nvm_rq) + sizeof(struct rrpc_rq),
-				0, 0, NULL);
+		rrpc_rq_cache = kmem_cache_create("nvm_rq",
+					sizeof(struct nvm_rq), 0, 0, NULL);
 		if (!rrpc_rq_cache) {
 			kmem_cache_destroy(rrpc_gcb_cache);
 			up_write(&rrpc_lock);
 			return -ENOMEM;
 		}
+
+		rrpc_rrq_cache = kmem_cache_create("rrpc_rrq",
+					sizeof(struct rrpc_rq), 0, 0, NULL);
+		if (!rrpc_rrq_cache) {
+			kmem_cache_destroy(rrpc_gcb_cache);
+			kmem_cache_destroy(rrpc_rq_cache);
+			up_write(&rrpc_lock);
+			return -ENOMEM;
+		}
+
+		rrpc_buf_rrq_cache = kmem_cache_create("rrpc_m_rrq",
+			rrpc->max_write_pgs * sizeof(struct rrpc_buf_rq),
+			0, 0, NULL);
+		if (!rrpc_buf_rrq_cache) {
+			kmem_cache_destroy(rrpc_gcb_cache);
+			kmem_cache_destroy(rrpc_rq_cache);
+			kmem_cache_destroy(rrpc_rrq_cache);
+			up_write(&rrpc_lock);
+			return -ENOMEM;
+		}
+	}
+
+	/* we assume that sec->sec_size is the same as the page size exposed by
+	 * rrpc (4KB). We need extra logic otherwise
+	 */
+	if (!rrpc_block_cache) {
+		/* Write buffer: Allocate all buffer (for all block) at once. We
+		 * avoid having to allocate a memory from the pool for each IO
+		 * at the cost pre-allocating memory for the whole block when a
+		 * new block is allocated from the media manager.
+		 */
+		rrpc_wb_cache = kmem_cache_create("nvm_wb",
+			dev->pgs_per_blk * dev->sec_per_pg * dev->sec_size,
+			0, 0, NULL);
+		if (!rrpc_wb_cache) {
+			kmem_cache_destroy(rrpc_gcb_cache);
+			kmem_cache_destroy(rrpc_rq_cache);
+			kmem_cache_destroy(rrpc_rrq_cache);
+			kmem_cache_destroy(rrpc_buf_rrq_cache);
+			up_write(&rrpc_lock);
+			return -ENOMEM;
+		}
+
+		/* Write buffer entries */
+		rrpc_block_cache = kmem_cache_create("nvm_entry",
+			dev->pgs_per_blk * dev->sec_per_pg *
+			sizeof(struct buf_entry),
+			0, 0, NULL);
+		if (!rrpc_block_cache) {
+			kmem_cache_destroy(rrpc_gcb_cache);
+			kmem_cache_destroy(rrpc_rq_cache);
+			kmem_cache_destroy(rrpc_rrq_cache);
+			kmem_cache_destroy(rrpc_buf_rrq_cache);
+			kmem_cache_destroy(rrpc_wb_cache);
+			up_write(&rrpc_lock);
+			return -ENOMEM;
+		}
 	}
 	up_write(&rrpc_lock);
 
@@ -1113,17 +1629,45 @@  static int rrpc_core_init(struct rrpc *rrpc)
 	if (!rrpc->rq_pool)
 		return -ENOMEM;
 
+	rrpc->rrq_pool = mempool_create_slab_pool(64, rrpc_rrq_cache);
+	if (!rrpc->rrq_pool)
+		return -ENOMEM;
+
+	rrpc->m_rrq_pool = mempool_create_slab_pool(64, rrpc_buf_rrq_cache);
+	if (!rrpc->m_rrq_pool)
+		return -ENOMEM;
+
+	rrpc->block_pool = mempool_create_slab_pool(8, rrpc_block_cache);
+	if (!rrpc->block_pool)
+		return -ENOMEM;
+
+	rrpc->write_buf_pool = mempool_create_slab_pool(8, rrpc_wb_cache);
+	if (!rrpc->write_buf_pool)
+		return -ENOMEM;
+
 	spin_lock_init(&rrpc->inflights.lock);
 	INIT_LIST_HEAD(&rrpc->inflights.reqs);
 
+	rrpc->kw_wq = alloc_workqueue("rrpc-writer",
+				WQ_MEM_RECLAIM | WQ_UNBOUND, rrpc->nr_luns);
+	if (!rrpc->kw_wq)
+		return -ENOMEM;
+
 	return 0;
 }
 
 static void rrpc_core_free(struct rrpc *rrpc)
 {
+	if (rrpc->kw_wq)
+		destroy_workqueue(rrpc->kw_wq);
+
 	mempool_destroy(rrpc->page_pool);
 	mempool_destroy(rrpc->gcb_pool);
+	mempool_destroy(rrpc->m_rrq_pool);
+	mempool_destroy(rrpc->rrq_pool);
 	mempool_destroy(rrpc->rq_pool);
+	mempool_destroy(rrpc->block_pool);
+	mempool_destroy(rrpc->write_buf_pool);
 }
 
 static void rrpc_luns_free(struct rrpc *rrpc)
@@ -1164,6 +1708,8 @@  static int rrpc_luns_init(struct rrpc *rrpc, int lun_begin, int lun_end)
 		INIT_LIST_HEAD(&rlun->open_list);
 		INIT_LIST_HEAD(&rlun->closed_list);
 
+		INIT_WORK(&rlun->ws_writer, rrpc_submit_write);
+
 		INIT_WORK(&rlun->ws_gc, rrpc_lun_gc);
 		spin_lock_init(&rlun->lock);
 
@@ -1209,6 +1755,7 @@  static void rrpc_exit(void *private)
 
 	flush_workqueue(rrpc->krqd_wq);
 	flush_workqueue(rrpc->kgc_wq);
+	/* flush_workqueue(rrpc->kw_wq); */ /* TODO: Implement flush + padding*/
 
 	rrpc_free(rrpc);
 }
diff --git a/drivers/lightnvm/rrpc.h b/drivers/lightnvm/rrpc.h
index 868e91a..6e188b4 100644
--- a/drivers/lightnvm/rrpc.h
+++ b/drivers/lightnvm/rrpc.h
@@ -49,17 +49,67 @@  struct rrpc_inflight_rq {
 struct rrpc_rq {
 	struct rrpc_inflight_rq inflight_rq;
 	struct rrpc_addr *addr;
+	int nr_pages;
+
+	struct kref refs;
+};
+
+struct rrpc_buf_rq {
+	struct rrpc_addr *addr;
+	struct rrpc_rq *rrqd;
+};
+
+/* Sync strategies from write buffer to media */
+enum {
+	NVM_SYNC_SOFT	= 0x0,		/* Only submit at max_write_pgs
+					 * supported by the device. Typically 64
+					 * pages (256k).
+					 */
+	NVM_SYNC_HARD	= 0x1,		/* Submit the whole buffer. Add padding
+					 * if necessary to respect the device's
+					 * min_write_pgs.
+					 */
+	NVM_SYNC_OPORT	= 0x2,		/* Submit what we can, always respecting
+					 * the device's min_write_pgs.
+					 */
+};
+
+struct buf_entry {
+	struct rrpc_rq *rrqd;
+	void *data;
+	struct rrpc_addr *addr;
 	unsigned long flags;
 };
 
+struct rrpc_w_buf {
+	struct buf_entry *entries;	/* Entries */
+	struct buf_entry *mem;		/* Points to the next writable entry */
+	struct buf_entry *subm;		/* Points to the last submitted entry */
+	int cur_mem;			/* Current memory entry. Follows mem */
+	int cur_subm;			/* Entries have been submitted to dev */
+	int nentries;			/* Number of entries in write buffer */
+
+	void *data;			/* Actual data */
+	unsigned long *sync_bitmap;	/* Bitmap representing physical
+					 * addresses that have been synced to
+					 * the media
+					 */
+
+	atomic_t refs;
+
+	spinlock_t w_lock;
+	spinlock_t s_lock;
+};
+
 struct rrpc_block {
 	struct nvm_block *parent;
 	struct rrpc_lun *rlun;
 	struct list_head prio;
 	struct list_head list;
+	struct rrpc_w_buf w_buf;
 
 #define MAX_INVALID_PAGES_STORAGE 8
-	/* Bitmap for invalid page intries */
+	/* Bitmap for invalid page entries */
 	unsigned long invalid_pages[MAX_INVALID_PAGES_STORAGE];
 	/* points to the next writable page within a block */
 	unsigned int next_page;
@@ -67,7 +117,6 @@  struct rrpc_block {
 	unsigned int nr_invalid_pages;
 
 	spinlock_t lock;
-	atomic_t data_cmnt_size; /* data pages committed to stable storage */
 };
 
 struct rrpc_lun {
@@ -86,6 +135,7 @@  struct rrpc_lun {
 					 */
 
 	struct work_struct ws_gc;
+	struct work_struct ws_writer;
 
 	spinlock_t lock;
 };
@@ -136,10 +186,15 @@  struct rrpc {
 	mempool_t *page_pool;
 	mempool_t *gcb_pool;
 	mempool_t *rq_pool;
+	mempool_t *rrq_pool;
+	mempool_t *m_rrq_pool;
+	mempool_t *block_pool;
+	mempool_t *write_buf_pool;
 
 	struct timer_list gc_timer;
 	struct workqueue_struct *krqd_wq;
 	struct workqueue_struct *kgc_wq;
+	struct workqueue_struct *kw_wq;
 };
 
 struct rrpc_block_gc {
@@ -181,7 +236,7 @@  static inline int request_intersects(struct rrpc_inflight_rq *r,
 }
 
 static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
-			     unsigned pages, struct rrpc_inflight_rq *r)
+				unsigned pages, struct rrpc_inflight_rq *r)
 {
 	sector_t laddr_end = laddr + pages - 1;
 	struct rrpc_inflight_rq *rtmp;
@@ -206,27 +261,26 @@  static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
 }
 
 static inline int rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
-				 unsigned pages,
-				 struct rrpc_inflight_rq *r)
+				unsigned pages,
+				struct rrpc_inflight_rq *r)
 {
 	BUG_ON((laddr + pages) > rrpc->nr_sects);
 
 	return __rrpc_lock_laddr(rrpc, laddr, pages, r);
 }
 
-static inline struct rrpc_inflight_rq *rrpc_get_inflight_rq(struct nvm_rq *rqd)
+static inline struct rrpc_inflight_rq
+				*rrpc_get_inflight_rq(struct rrpc_rq *rrqd)
 {
-	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
-
 	return &rrqd->inflight_rq;
 }
 
 static inline int rrpc_lock_rq(struct rrpc *rrpc, struct bio *bio,
-							struct nvm_rq *rqd)
+							struct rrpc_rq *rrqd)
 {
 	sector_t laddr = rrpc_get_laddr(bio);
 	unsigned int pages = rrpc_get_pages(bio);
-	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
+	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
 
 	return rrpc_lock_laddr(rrpc, laddr, pages, r);
 }
@@ -241,12 +295,12 @@  static inline void rrpc_unlock_laddr(struct rrpc *rrpc,
 	spin_unlock_irqrestore(&rrpc->inflights.lock, flags);
 }
 
-static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct nvm_rq *rqd)
+static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct rrpc_rq *rrqd)
 {
-	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
-	uint8_t pages = rqd->nr_pages;
+	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
+	uint8_t nr_pages = rrqd->nr_pages;
 
-	BUG_ON((r->l_start + pages) > rrpc->nr_sects);
+	BUG_ON((r->l_start + nr_pages) > rrpc->nr_sects);
 
 	rrpc_unlock_laddr(rrpc, r);
 }
diff --git a/include/linux/lightnvm.h b/include/linux/lightnvm.h
index b94f2d5..eda9743 100644
--- a/include/linux/lightnvm.h
+++ b/include/linux/lightnvm.h
@@ -231,6 +231,8 @@  struct nvm_rq {
 	void *metadata;
 	dma_addr_t dma_metadata;
 
+	void *priv;
+
 	struct completion *wait;
 	nvm_end_io_fn *end_io;
 
@@ -243,12 +245,12 @@  struct nvm_rq {
 
 static inline struct nvm_rq *nvm_rq_from_pdu(void *pdu)
 {
-	return pdu - sizeof(struct nvm_rq);
+	return container_of(pdu, struct nvm_rq, priv);
 }
 
 static inline void *nvm_rq_to_pdu(struct nvm_rq *rqdata)
 {
-	return rqdata + 1;
+	return rqdata->priv;
 }
 
 struct nvm_block;