Re: [RFC 2/4] lightnvm: add write buffering for rrpc

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On 02/04/2016 02:08 PM, Javier González wrote:
> Flash controllers typically define flash pages as a collection of flash
> sectors of typically 4K. Moreover, flash controllers might program flash
> pages across several planes. This defines the write granurality at which
> flash can be programmed. This is different for each flash controller. In
> rrpc, the assumption has been that flash pages are 4K, thus writes could
> be sent out to the media as they were received.
> 
> This patch adds a per-block write buffer to rrpc. Writes are flushed to
> the media in chuncks of the minimum granurality allowed by the
> controller, though the strategy can be changed to, for example, only
> flush at the maximum supported (64 pages in nvme). Apart from enabling
> the use of rrpc in real hardware, this buffering strategy will be used
> for recovery; if a block becomes bad, a new block can be allocated to
> persist valid data.

Great work. The code is nearly 2x'ed in size. Can you add a detailed
explanation on the inner workings of the write buffering to help new
users to understand its logic?

> 
> Signed-off-by: Javier González <javier@xxxxxxxxxxxx>
> ---
>  drivers/lightnvm/rrpc.c  | 851 ++++++++++++++++++++++++++++++++++++++---------
>  drivers/lightnvm/rrpc.h  |  82 ++++-
>  include/linux/lightnvm.h |   6 +-
>  3 files changed, 771 insertions(+), 168 deletions(-)
> 
> diff --git a/drivers/lightnvm/rrpc.c b/drivers/lightnvm/rrpc.c
> index 8187bf3..e9fb19d 100644
> --- a/drivers/lightnvm/rrpc.c
> +++ b/drivers/lightnvm/rrpc.c
> @@ -16,11 +16,12 @@
>  
>  #include "rrpc.h"
>  
> -static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache;
> +static struct kmem_cache *rrpc_gcb_cache, *rrpc_rq_cache, *rrpc_rrq_cache,
> +			*rrpc_buf_rrq_cache, *rrpc_wb_cache, *rrpc_block_cache;
>  static DECLARE_RWSEM(rrpc_lock);
>  
>  static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> -				struct nvm_rq *rqd, unsigned long flags);
> +				struct rrpc_rq *rqd, unsigned long flags);
>  
>  #define rrpc_for_each_lun(rrpc, rlun, i) \
>  		for ((i) = 0, rlun = &(rrpc)->luns[0]; \
> @@ -62,53 +63,59 @@ static void rrpc_invalidate_range(struct rrpc *rrpc, sector_t slba,
>  	spin_unlock(&rrpc->rev_lock);
>  }
>  
> -static struct nvm_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
> +static void rrpc_release_and_free_rrqd(struct kref *ref)
> +{
> +	struct rrpc_rq *rrqd = container_of(ref, struct rrpc_rq, refs);
> +	struct rrpc *rrpc = rrqd->addr->rblk->rlun->rrpc;

Wow.. Happy that we got to rrpc :)

> +
> +	rrpc_unlock_rq(rrpc, rrqd);
> +	mempool_free(rrqd, rrpc->rrq_pool);
> +}
> +
> +static struct rrpc_rq *rrpc_inflight_laddr_acquire(struct rrpc *rrpc,
>  					sector_t laddr, unsigned int pages)
>  {
> -	struct nvm_rq *rqd;
> +	struct rrpc_rq *rrqd;
>  	struct rrpc_inflight_rq *inf;
>  
> -	rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
> -	if (!rqd)
> +	rrqd = mempool_alloc(rrpc->rrq_pool, GFP_ATOMIC);
> +	if (!rrqd)
>  		return ERR_PTR(-ENOMEM);
> +	kref_init(&rrqd->refs);
>  
> -	inf = rrpc_get_inflight_rq(rqd);
> +	inf = rrpc_get_inflight_rq(rrqd);
>  	if (rrpc_lock_laddr(rrpc, laddr, pages, inf)) {
> -		mempool_free(rqd, rrpc->rq_pool);
> +		mempool_free(rrqd, rrpc->rrq_pool);
>  		return NULL;
>  	}
>  
> -	return rqd;
> +	return rrqd;
>  }
>  
> -static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct nvm_rq *rqd)
> +static void rrpc_inflight_laddr_release(struct rrpc *rrpc, struct rrpc_rq *rrqd)
>  {
> -	struct rrpc_inflight_rq *inf = rrpc_get_inflight_rq(rqd);
> -
> -	rrpc_unlock_laddr(rrpc, inf);
> -
> -	mempool_free(rqd, rrpc->rq_pool);
> +	kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
>  }
>  
>  static void rrpc_discard(struct rrpc *rrpc, struct bio *bio)
>  {
>  	sector_t slba = bio->bi_iter.bi_sector / NR_PHY_IN_LOG;
>  	sector_t len = bio->bi_iter.bi_size / RRPC_EXPOSED_PAGE_SIZE;
> -	struct nvm_rq *rqd;
> +	struct rrpc_rq *rrqd;
>  
>  	do {
> -		rqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
> +		rrqd = rrpc_inflight_laddr_acquire(rrpc, slba, len);
>  		schedule();
> -	} while (!rqd);
> +	} while (!rrqd);
>  
> -	if (IS_ERR(rqd)) {
> +	if (IS_ERR(rrqd)) {
>  		pr_err("rrpc: unable to acquire inflight IO\n");
>  		bio_io_error(bio);
>  		return;
>  	}
>  
>  	rrpc_invalidate_range(rrpc, slba, len);
> -	rrpc_inflight_laddr_release(rrpc, rqd);
> +	rrpc_inflight_laddr_release(rrpc, rrqd);
>  }
>  
>  static int block_is_full(struct rrpc *rrpc, struct rrpc_block *rblk)
> @@ -166,8 +173,6 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
>  {
>  	struct rrpc *rrpc = rlun->rrpc;
>  
> -	BUG_ON(!rblk);
> -
>  	if (rlun->cur) {
>  		spin_lock(&rlun->cur->lock);
>  		WARN_ON(!block_is_full(rrpc, rlun->cur));
> @@ -176,12 +181,107 @@ static void rrpc_set_lun_cur(struct rrpc_lun *rlun, struct rrpc_block *rblk)
>  	rlun->cur = rblk;
>  }
>  
> +static void rrpc_free_w_buffer(struct rrpc *rrpc, struct rrpc_block *rblk)
> +{
> +try:
> +	spin_lock(&rblk->w_buf.s_lock);
> +	if (atomic_read(&rblk->w_buf.refs) > 0) {
> +		spin_unlock(&rblk->w_buf.s_lock);
> +		schedule();
> +		goto try;
> +	}
> +
> +	mempool_free(rblk->w_buf.entries, rrpc->block_pool);
> +	rblk->w_buf.entries = NULL;
> +	spin_unlock(&rblk->w_buf.s_lock);
> +
> +	/* TODO: Reuse the same buffers if the block size is the same */
> +	mempool_free(rblk->w_buf.data, rrpc->write_buf_pool);
> +	kfree(rblk->w_buf.sync_bitmap);
> +
> +	rblk->w_buf.mem = NULL;
> +	rblk->w_buf.subm = NULL;
> +	rblk->w_buf.sync_bitmap = NULL;
> +	rblk->w_buf.data = NULL;
> +	rblk->w_buf.nentries = 0;
> +	rblk->w_buf.cur_mem = 0;
> +	rblk->w_buf.cur_subm = 0;
> +}
> +
> +static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
> +{
> +	struct rrpc_lun *rlun = rblk->rlun;
> +	struct nvm_lun *lun = rlun->parent;
> +	struct rrpc_w_buf *buf = &rblk->w_buf;
> +
> +try:
> +	spin_lock(&buf->w_lock);
> +	/* Flush inflight I/Os */
> +	if (!bitmap_full(buf->sync_bitmap, buf->cur_mem)) {
> +		spin_unlock(&buf->w_lock);
> +		schedule();
> +		goto try;
> +	}
> +	spin_unlock(&buf->w_lock);
> +
> +	if (rblk->w_buf.entries)
> +		rrpc_free_w_buffer(rrpc, rblk);
> +
> +	spin_lock(&lun->lock);
> +	nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
> +	list_del(&rblk->list);
> +	spin_unlock(&lun->lock);
> +}
> +
> +static void rrpc_put_blks(struct rrpc *rrpc)
> +{
> +	struct rrpc_lun *rlun;
> +	int i;
> +
> +	for (i = 0; i < rrpc->nr_luns; i++) {
> +		rlun = &rrpc->luns[i];
> +		if (rlun->cur)
> +			rrpc_put_blk(rrpc, rlun->cur);
> +		if (rlun->gc_cur)
> +			rrpc_put_blk(rrpc, rlun->gc_cur);
> +	}
> +}
> +
>  static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
>  							unsigned long flags)
>  {
> +	struct nvm_dev *dev = rrpc->dev;
>  	struct nvm_lun *lun = rlun->parent;
>  	struct nvm_block *blk;
>  	struct rrpc_block *rblk;
> +	struct buf_entry *entries;
> +	unsigned long *sync_bitmap;
> +	void *data;
> +	int nentries = dev->pgs_per_blk * dev->sec_per_pg;
> +
> +	data = mempool_alloc(rrpc->write_buf_pool, GFP_ATOMIC);
> +	if (!data) {
> +		pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
> +		return NULL;
> +	}
> +
> +	entries = mempool_alloc(rrpc->block_pool, GFP_ATOMIC);
> +	if (!entries) {
> +		pr_err("nvm: rrpc: cannot allocate write buffer for block\n");
> +		mempool_free(data, rrpc->write_buf_pool);
> +		return NULL;
> +	}
> +
> +	/* TODO: Mempool? */

Not on fast path. I don't think we need mempools at all to take some
unnecessary memory.

> +	sync_bitmap = kmalloc(BITS_TO_LONGS(nentries) *
> +					sizeof(unsigned long), GFP_ATOMIC);
> +	if (!sync_bitmap) {
> +		mempool_free(data, rrpc->write_buf_pool);
> +		mempool_free(entries, rrpc->block_pool);
> +		return NULL;
> +	}
> +
> +	bitmap_zero(sync_bitmap, nentries);
>  
>  	spin_lock(&lun->lock);
>  	blk = nvm_get_blk_unlocked(rrpc->dev, rlun->parent, flags);
> @@ -192,43 +292,34 @@ static struct rrpc_block *rrpc_get_blk(struct rrpc *rrpc, struct rrpc_lun *rlun,
>  	}
>  
>  	rblk = &rlun->blocks[blk->id];
> -	list_add_tail(&rblk->list, &rlun->open_list);
> -	spin_unlock(&lun->lock);
>  
>  	blk->priv = rblk;
> -	bitmap_zero(rblk->invalid_pages, rrpc->dev->pgs_per_blk);
> +	bitmap_zero(rblk->invalid_pages, dev->pgs_per_blk);
>  	rblk->next_page = 0;
>  	rblk->nr_invalid_pages = 0;
> -	atomic_set(&rblk->data_cmnt_size, 0);
> +
> +	rblk->w_buf.data = data;
> +	rblk->w_buf.entries = entries;
> +	rblk->w_buf.sync_bitmap = sync_bitmap;
> +
> +	rblk->w_buf.entries->data  = rblk->w_buf.data;
> +	rblk->w_buf.mem = rblk->w_buf.entries;
> +	rblk->w_buf.subm = rblk->w_buf.entries;
> +	rblk->w_buf.nentries = nentries;
> +	rblk->w_buf.cur_mem = 0;
> +	rblk->w_buf.cur_subm = 0;
> +
> +	atomic_set(&rblk->w_buf.refs, 0);
> +
> +	spin_lock_init(&rblk->w_buf.w_lock);
> +	spin_lock_init(&rblk->w_buf.s_lock);
> +
> +	list_add_tail(&rblk->list, &rlun->open_list);
> +	spin_unlock(&lun->lock);
>  
>  	return rblk;
>  }
>  
> -static void rrpc_put_blk(struct rrpc *rrpc, struct rrpc_block *rblk)
> -{
> -	struct rrpc_lun *rlun = rblk->rlun;
> -	struct nvm_lun *lun = rlun->parent;
> -
> -	spin_lock(&lun->lock);
> -	nvm_put_blk_unlocked(rrpc->dev, rblk->parent);
> -	list_del(&rblk->list);
> -	spin_unlock(&lun->lock);
> -}
> -
> -static void rrpc_put_blks(struct rrpc *rrpc)
> -{
> -	struct rrpc_lun *rlun;
> -	int i;
> -
> -	for (i = 0; i < rrpc->nr_luns; i++) {
> -		rlun = &rrpc->luns[i];
> -		if (rlun->cur)
> -			rrpc_put_blk(rrpc, rlun->cur);
> -		if (rlun->gc_cur)
> -			rrpc_put_blk(rrpc, rlun->gc_cur);
> -	}
> -}
> -
>  static struct rrpc_lun *get_next_lun(struct rrpc *rrpc)
>  {
>  	int next = atomic_inc_return(&rrpc->next_lun);
> @@ -247,6 +338,17 @@ static void rrpc_gc_kick(struct rrpc *rrpc)
>  	}
>  }
>  
> +static void rrpc_writer_kick(struct rrpc *rrpc)
> +{
> +	struct rrpc_lun *rlun;
> +	unsigned int i;
> +
> +	for (i = 0; i < rrpc->nr_luns; i++) {
> +		rlun = &rrpc->luns[i];
> +		queue_work(rrpc->kw_wq, &rlun->ws_writer);
> +	}
> +}
> +
>  /*
>   * timed GC every interval.
>   */
> @@ -282,7 +384,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
>  {
>  	struct request_queue *q = rrpc->dev->q;
>  	struct rrpc_rev_addr *rev;
> -	struct nvm_rq *rqd;
> +	struct rrpc_rq *rrqd;
>  	struct bio *bio;
>  	struct page *page;
>  	int slot;
> @@ -306,7 +408,7 @@ static int rrpc_move_valid_pages(struct rrpc *rrpc, struct rrpc_block *rblk)
>  	}
>  
>  	while ((slot = find_first_zero_bit(rblk->invalid_pages,
> -					    nr_pgs_per_blk)) < nr_pgs_per_blk) {
> +					nr_pgs_per_blk)) < nr_pgs_per_blk) {

No need to fix the indentation here.

>  
>  		/* Lock laddr */
>  		phys_addr = (rblk->parent->id * nr_pgs_per_blk) + slot;
> @@ -321,8 +423,8 @@ try:
>  			continue;
>  		}
>  
> -		rqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
> -		if (IS_ERR_OR_NULL(rqd)) {
> +		rrqd = rrpc_inflight_laddr_acquire(rrpc, rev->addr, 1);
> +		if (IS_ERR_OR_NULL(rrqd)) {
>  			spin_unlock(&rrpc->rev_lock);
>  			schedule();
>  			goto try;
> @@ -339,14 +441,14 @@ try:
>  		/* TODO: may fail when EXP_PG_SIZE > PAGE_SIZE */
>  		bio_add_pc_page(q, bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
>  
> -		if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
> +		if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)) {
>  			pr_err("rrpc: gc read failed.\n");
> -			rrpc_inflight_laddr_release(rrpc, rqd);
> +			rrpc_inflight_laddr_release(rrpc, rrqd);
>  			goto finished;
>  		}
>  		wait_for_completion_io(&wait);
>  		if (bio->bi_error) {
> -			rrpc_inflight_laddr_release(rrpc, rqd);
> +			rrpc_inflight_laddr_release(rrpc, rrqd);
>  			goto finished;
>  		}
>  
> @@ -363,14 +465,16 @@ try:
>  		/* turn the command around and write the data back to a new
>  		 * address
>  		 */
> -		if (rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_GC)) {
> +		if (rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_GC)
> +							!= NVM_IO_DONE) {
>  			pr_err("rrpc: gc write failed.\n");
> -			rrpc_inflight_laddr_release(rrpc, rqd);
> +			rrpc_inflight_laddr_release(rrpc, rrqd);
>  			goto finished;
>  		}
> +		bio_endio(bio);
>  		wait_for_completion_io(&wait);
>  
> -		rrpc_inflight_laddr_release(rrpc, rqd);
> +		/* rrpc_inflight_laddr_release(rrpc, rrqd); */

This is commented out.

>  		if (bio->bi_error)
>  			goto finished;
>  
> @@ -514,6 +618,8 @@ static void rrpc_gc_queue(struct work_struct *work)
>  	list_move_tail(&rblk->list, &rlun->closed_list);
>  	spin_unlock(&lun->lock);
>  
> +	rrpc_free_w_buffer(rrpc, rblk);
> +
>  	mempool_free(gcb, rrpc->gcb_pool);
>  	pr_debug("nvm: block '%lu' is full, allow GC (sched)\n",
>  							rblk->parent->id);
> @@ -663,43 +769,72 @@ static void rrpc_run_gc(struct rrpc *rrpc, struct rrpc_block *rblk)
>  	queue_work(rrpc->kgc_wq, &gcb->ws_gc);
>  }
>  
> -static void rrpc_end_io_write(struct rrpc *rrpc, struct rrpc_rq *rrqd,
> -						sector_t laddr, uint8_t npages)
> +static void rrpc_sync_buffer(struct rrpc *rrpc, struct rrpc_addr *p)
>  {
> -	struct rrpc_addr *p;
>  	struct rrpc_block *rblk;
> +	struct rrpc_w_buf *buf;
>  	struct nvm_lun *lun;
> -	int cmnt_size, i;
> +	unsigned long bppa;
>  
> -	for (i = 0; i < npages; i++) {
> -		p = &rrpc->trans_map[laddr + i];
> -		rblk = p->rblk;
> -		lun = rblk->parent->lun;
> +	rblk = p->rblk;
> +	buf = &rblk->w_buf;
> +	lun = rblk->parent->lun;
>  
> -		cmnt_size = atomic_inc_return(&rblk->data_cmnt_size);
> -		if (unlikely(cmnt_size == rrpc->dev->pgs_per_blk))
> -			rrpc_run_gc(rrpc, rblk);
> +	bppa = rrpc->dev->sec_per_blk * rblk->parent->id;
> +
> +	WARN_ON(test_and_set_bit((p->addr - bppa), buf->sync_bitmap));
> +
> +	if (unlikely(bitmap_full(buf->sync_bitmap, buf->nentries))) {
> +		/* Write buffer out-of-bounds */
> +		WARN_ON((buf->cur_mem != buf->nentries) &&
> +					(buf->cur_mem != buf->cur_subm));
> +
> +		rrpc_run_gc(rrpc, rblk);
> +	}
> +}
> +
> +static void rrpc_end_io_write(struct rrpc *rrpc, struct nvm_rq *rqd,
> +							uint8_t nr_pages)
> +{
> +	struct rrpc_buf_rq *brrqd = nvm_rq_to_pdu(rqd);
> +	struct rrpc_rq *rrqd;
> +	int i;
> +
> +	for (i = 0; i < nr_pages; i++) {
> +		rrqd = brrqd[i].rrqd;
> +		rrpc_sync_buffer(rrpc, brrqd[i].addr);
> +		kref_put(&rrqd->refs, rrpc_release_and_free_rrqd);
>  	}
> +
> +	mempool_free(brrqd, rrpc->m_rrq_pool);
> +	rrpc_writer_kick(rrpc);
> +}
> +
> +static void rrpc_end_io_read(struct rrpc *rrpc, struct nvm_rq *rqd,
> +							uint8_t nr_pages)
> +{
> +	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> +
> +	if (rqd->flags & NVM_IOTYPE_GC)
> +		return;
> +
> +	rrpc_unlock_rq(rrpc, rrqd);
> +	mempool_free(rrqd, rrpc->rrq_pool);
>  }
>  
>  static void rrpc_end_io(struct nvm_rq *rqd)
>  {
>  	struct rrpc *rrpc = container_of(rqd->ins, struct rrpc, instance);
> -	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> -	uint8_t npages = rqd->nr_pages;
> -	sector_t laddr = rrpc_get_laddr(rqd->bio) - npages;
> +	uint8_t nr_pages = rqd->nr_pages;
>  
>  	if (bio_data_dir(rqd->bio) == WRITE)
> -		rrpc_end_io_write(rrpc, rrqd, laddr, npages);
> +		rrpc_end_io_write(rrpc, rqd, nr_pages);
> +	else
> +		rrpc_end_io_read(rrpc, rqd, nr_pages);
>  
>  	bio_put(rqd->bio);
>  
> -	if (rrqd->flags & NVM_IOTYPE_GC)
> -		return;
> -
> -	rrpc_unlock_rq(rrpc, rqd);
> -
> -	if (npages > 1)
> +	if (nr_pages > 1)

Cleanup patches can go in afterwards or before.

>  		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
>  	if (rqd->metadata)
>  		nvm_dev_dma_free(rrpc->dev, rqd->metadata, rqd->dma_metadata);
> @@ -708,20 +843,24 @@ static void rrpc_end_io(struct nvm_rq *rqd)
>  }
>  
>  static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
> -			struct nvm_rq *rqd, unsigned long flags, int npages)
> +			struct nvm_rq *rqd, struct rrpc_buf_rq *brrqd,
> +			unsigned long flags, int nr_pages)
>  {
> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> +	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
>  	struct rrpc_addr *gp;
>  	sector_t laddr = rrpc_get_laddr(bio);
>  	int is_gc = flags & NVM_IOTYPE_GC;
>  	int i;
>  
> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
>  		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> +		mempool_free(rrqd, rrpc->rrq_pool);
> +		mempool_free(rqd, rrpc->rq_pool);

Seems like there is a relationship here between rrq_pool (rrpc_rq) and
rq_pool (nvm_rq)

To get to a single mempool alloc ( and just keep the nvm_rq mempool )



>  		return NVM_IO_REQUEUE;
>  	}
>  
> -	for (i = 0; i < npages; i++) {
> +	for (i = 0; i < nr_pages; i++) {
>  		/* We assume that mapping occurs at 4KB granularity */
>  		BUG_ON(!(laddr + i >= 0 && laddr + i < rrpc->nr_sects));
>  		gp = &rrpc->trans_map[laddr + i];
> @@ -734,8 +873,12 @@ static int rrpc_read_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
>  			rrpc_unlock_laddr(rrpc, r);
>  			nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
>  							rqd->dma_ppa_list);
> +			mempool_free(rrqd, rrpc->rrq_pool);
> +			mempool_free(rqd, rrpc->rq_pool);
>  			return NVM_IO_DONE;
>  		}
> +
> +		brrqd[i].addr = gp;
>  	}
>  
>  	rqd->opcode = NVM_OP_HBREAD;
> @@ -751,8 +894,11 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
>  	sector_t laddr = rrpc_get_laddr(bio);
>  	struct rrpc_addr *gp;
>  
> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> +		mempool_free(rrqd, rrpc->rrq_pool);
> +		mempool_free(rqd, rrpc->rq_pool);
>  		return NVM_IO_REQUEUE;
> +	}
>  
>  	BUG_ON(!(laddr >= 0 && laddr < rrpc->nr_sects));
>  	gp = &rrpc->trans_map[laddr];
> @@ -761,7 +907,9 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
>  		rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, gp->addr);
>  	} else {
>  		BUG_ON(is_gc);
> -		rrpc_unlock_rq(rrpc, rqd);
> +		rrpc_unlock_rq(rrpc, rrqd);
> +		mempool_free(rrqd, rrpc->rrq_pool);
> +		mempool_free(rqd, rrpc->rq_pool);
>  		return NVM_IO_DONE;
>  	}
>  
> @@ -771,120 +919,190 @@ static int rrpc_read_rq(struct rrpc *rrpc, struct bio *bio, struct nvm_rq *rqd,
>  	return NVM_IO_OK;
>  }
>  
> +/*
> + * Copy data from current bio to block write buffer. This if necessary
> + * to guarantee durability if a flash block becomes bad before all pages
> + * are written. This buffer is also used to write at the right page
> + * granurality
> + */
> +static int rrpc_write_to_buffer(struct rrpc *rrpc, struct bio *bio,
> +				struct rrpc_rq *rrqd, struct rrpc_addr *addr,
> +				struct rrpc_w_buf *w_buf,
> +				unsigned long flags)
> +{
> +	struct nvm_dev *dev = rrpc->dev;
> +	unsigned int bio_len = bio_cur_bytes(bio);
> +
> +	if (bio_len != RRPC_EXPOSED_PAGE_SIZE)
> +		return NVM_IO_ERR;
> +
> +	spin_lock(&w_buf->w_lock);
> +
> +	WARN_ON(w_buf->cur_mem == w_buf->nentries);
> +
> +	w_buf->mem->rrqd = rrqd;
> +	w_buf->mem->addr = addr;
> +	w_buf->mem->flags = flags;
> +
> +	memcpy(w_buf->mem->data, bio_data(bio), bio_len);
> +
> +	w_buf->cur_mem++;
> +	if (likely(w_buf->cur_mem < w_buf->nentries)) {
> +		w_buf->mem++;
> +		w_buf->mem->data =
> +				w_buf->data + (w_buf->cur_mem * dev->sec_size);
> +	}
> +
> +	spin_unlock(&w_buf->w_lock);
> +
> +	return 0;
> +}
> +
>  static int rrpc_write_ppalist_rq(struct rrpc *rrpc, struct bio *bio,
> -			struct nvm_rq *rqd, unsigned long flags, int npages)
> +			struct rrpc_rq *rrqd, unsigned long flags, int nr_pages)
>  {
> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
> +	struct rrpc_w_buf *w_buf;
>  	struct rrpc_addr *p;
> +	struct rrpc_lun *rlun;
>  	sector_t laddr = rrpc_get_laddr(bio);
>  	int is_gc = flags & NVM_IOTYPE_GC;
> +	int err;
>  	int i;
>  
> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd)) {
> -		nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> +		mempool_free(rrqd, rrpc->rrq_pool);
>  		return NVM_IO_REQUEUE;
>  	}
>  
> -	for (i = 0; i < npages; i++) {
> +	for (i = 0; i < nr_pages; i++) {
> +		kref_get(&rrqd->refs);
> +
>  		/* We assume that mapping occurs at 4KB granularity */
>  		p = rrpc_map_page(rrpc, laddr + i, is_gc);
>  		if (!p) {
>  			BUG_ON(is_gc);
>  			rrpc_unlock_laddr(rrpc, r);
> -			nvm_dev_dma_free(rrpc->dev, rqd->ppa_list,
> -							rqd->dma_ppa_list);
> +			mempool_free(rrqd, rrpc->rrq_pool);
>  			rrpc_gc_kick(rrpc);
>  			return NVM_IO_REQUEUE;
>  		}
>  
> -		rqd->ppa_list[i] = rrpc_ppa_to_gaddr(rrpc->dev,
> -								p->addr);
> +		w_buf = &p->rblk->w_buf;
> +		rlun = p->rblk->rlun;
> +
> +		rrqd->addr = p;
> +
> +		err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
> +		if (err) {
> +			pr_err("rrpc: could not write to write buffer\n");
> +			return err;
> +		}
> +
> +		bio_advance(bio, RRPC_EXPOSED_PAGE_SIZE);
> +
> +		queue_work(rrpc->kw_wq, &rlun->ws_writer);

How about a timer and only the kick when a flush command is sent?

>  	}
>  
> -	rqd->opcode = NVM_OP_HBWRITE;
> +	if (kref_put(&rrqd->refs, rrpc_release_and_free_rrqd)) {
> +		pr_err("rrpc: request reference counter dailed\n");
> +		return NVM_IO_ERR;
> +	}
>  
> -	return NVM_IO_OK;
> +	return NVM_IO_DONE;
>  }
>  
>  static int rrpc_write_rq(struct rrpc *rrpc, struct bio *bio,
> -				struct nvm_rq *rqd, unsigned long flags)
> +				struct rrpc_rq *rrqd, unsigned long flags)
>  {
> -	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> +	struct rrpc_w_buf *w_buf;
>  	struct rrpc_addr *p;
> +	struct rrpc_lun *rlun;
>  	int is_gc = flags & NVM_IOTYPE_GC;
> +	int err;
>  	sector_t laddr = rrpc_get_laddr(bio);
>  
> -	if (!is_gc && rrpc_lock_rq(rrpc, bio, rqd))
> +	if (!is_gc && rrpc_lock_rq(rrpc, bio, rrqd)) {
> +		mempool_free(rrqd, rrpc->rrq_pool);
>  		return NVM_IO_REQUEUE;
> +	}
>  
>  	p = rrpc_map_page(rrpc, laddr, is_gc);
>  	if (!p) {
>  		BUG_ON(is_gc);
> -		rrpc_unlock_rq(rrpc, rqd);
> +		rrpc_unlock_rq(rrpc, rrqd);
> +		mempool_free(rrqd, rrpc->rrq_pool);
>  		rrpc_gc_kick(rrpc);
>  		return NVM_IO_REQUEUE;
>  	}
>  
> -	rqd->ppa_addr = rrpc_ppa_to_gaddr(rrpc->dev, p->addr);
> -	rqd->opcode = NVM_OP_HBWRITE;
> +	w_buf = &p->rblk->w_buf;
> +	rlun = p->rblk->rlun;
> +
>  	rrqd->addr = p;
>  
> -	return NVM_IO_OK;
> +	err = rrpc_write_to_buffer(rrpc, bio, rrqd, p, w_buf, flags);
> +	if (err) {
> +		pr_err("rrpc: could not write to write buffer\n");
> +		return err;
> +	}
> +
> +	queue_work(rrpc->kw_wq, &rlun->ws_writer);
> +	return NVM_IO_DONE;
>  }
>  
> -static int rrpc_setup_rq(struct rrpc *rrpc, struct bio *bio,
> -			struct nvm_rq *rqd, unsigned long flags, uint8_t npages)
> +static int rrpc_submit_read(struct rrpc *rrpc, struct bio *bio,
> +				struct rrpc_rq *rrqd, unsigned long flags)
>  {
> -	if (npages > 1) {
> +	struct nvm_rq *rqd;
> +	struct rrpc_buf_rq brrqd[rrpc->max_write_pgs];
> +	uint8_t nr_pages = rrpc_get_pages(bio);
> +	int err;
> +
> +	rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
> +	if (!rqd) {
> +		pr_err_ratelimited("rrpc: not able to queue bio.");
> +		bio_io_error(bio);
> +		return NVM_IO_ERR;
> +	}
> +	rqd->metadata = NULL;
> +	rqd->priv = rrqd;
> +
> +	if (nr_pages > 1) {
>  		rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_KERNEL,
> -							&rqd->dma_ppa_list);
> +						&rqd->dma_ppa_list);
>  		if (!rqd->ppa_list) {
>  			pr_err("rrpc: not able to allocate ppa list\n");
> +			mempool_free(rrqd, rrpc->rrq_pool);
> +			mempool_free(rqd, rrpc->rq_pool);
>  			return NVM_IO_ERR;
>  		}
>  
> -		if (bio_rw(bio) == WRITE)
> -			return rrpc_write_ppalist_rq(rrpc, bio, rqd, flags,
> -									npages);
> -
> -		return rrpc_read_ppalist_rq(rrpc, bio, rqd, flags, npages);
> +		err = rrpc_read_ppalist_rq(rrpc, bio, rqd, brrqd, flags,
> +								nr_pages);
> +		if (err) {
> +			mempool_free(rrqd, rrpc->rrq_pool);
> +			mempool_free(rqd, rrpc->rq_pool);
> +			return err;
> +		}
> +	} else {
> +		err = rrpc_read_rq(rrpc, bio, rqd, flags);
> +		if (err)
> +			return err;
>  	}
>  
> -	if (bio_rw(bio) == WRITE)
> -		return rrpc_write_rq(rrpc, bio, rqd, flags);
> -
> -	return rrpc_read_rq(rrpc, bio, rqd, flags);
> -}
> -
> -static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> -				struct nvm_rq *rqd, unsigned long flags)
> -{
> -	int err;
> -	struct rrpc_rq *rrq = nvm_rq_to_pdu(rqd);
> -	uint8_t nr_pages = rrpc_get_pages(bio);
> -	int bio_size = bio_sectors(bio) << 9;
> -
> -	if (bio_size < rrpc->dev->sec_size)
> -		return NVM_IO_ERR;
> -	else if (bio_size > rrpc->dev->max_rq_size)
> -		return NVM_IO_ERR;
> -
> -	err = rrpc_setup_rq(rrpc, bio, rqd, flags, nr_pages);
> -	if (err)
> -		return err;
> -
>  	bio_get(bio);
>  	rqd->bio = bio;
>  	rqd->ins = &rrpc->instance;
> -	rqd->nr_pages = nr_pages;
> -	rrq->flags = flags;
> +	rqd->nr_pages = rrqd->nr_pages = nr_pages;
> +	rqd->flags = flags;
>  
>  	err = nvm_submit_io(rrpc->dev, rqd);
>  	if (err) {
>  		pr_err("rrpc: I/O submission failed: %d\n", err);
>  		bio_put(bio);
>  		if (!(flags & NVM_IOTYPE_GC)) {
> -			rrpc_unlock_rq(rrpc, rqd);
> +			rrpc_unlock_rq(rrpc, rrqd);
>  			if (rqd->nr_pages > 1)
>  				nvm_dev_dma_free(rrpc->dev,
>  			rqd->ppa_list, rqd->dma_ppa_list);
> @@ -895,10 +1113,39 @@ static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
>  	return NVM_IO_OK;
>  }
>  
> +static int rrpc_buffer_write(struct rrpc *rrpc, struct bio *bio,
> +				struct rrpc_rq *rrqd, unsigned long flags)
> +{
> +	uint8_t nr_pages = rrpc_get_pages(bio);
> +
> +	rrqd->nr_pages = nr_pages;
> +
> +	if (nr_pages > 1)
> +		return rrpc_write_ppalist_rq(rrpc, bio, rrqd, flags, nr_pages);
> +	else

You can skip the else here.

> +		return rrpc_write_rq(rrpc, bio, rrqd, flags);
> +}
> +
> +static int rrpc_submit_io(struct rrpc *rrpc, struct bio *bio,
> +				struct rrpc_rq *rrqd, unsigned long flags)
> +{
> +	int bio_size = bio_sectors(bio) << 9;
> +
> +	if (bio_size < rrpc->dev->sec_size)
> +		return NVM_IO_ERR;
> +	else if (bio_size > rrpc->dev->max_rq_size)
> +		return NVM_IO_ERR;

I have a patch incoming that removes this. Properly making
rrpc_submit_io obsolete.

> +
> +	if (bio_rw(bio) == READ)
> +		return rrpc_submit_read(rrpc, bio, rrqd, flags);
> +
> +	return rrpc_buffer_write(rrpc, bio, rrqd, flags);
> +}
> +
>  static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
>  {
>  	struct rrpc *rrpc = q->queuedata;
> -	struct nvm_rq *rqd;
> +	struct rrpc_rq *rrqd;
>  	int err;
>  
>  	if (bio->bi_rw & REQ_DISCARD) {
> @@ -906,15 +1153,15 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
>  		return BLK_QC_T_NONE;
>  	}
>  
> -	rqd = mempool_alloc(rrpc->rq_pool, GFP_KERNEL);
> -	if (!rqd) {
> -		pr_err_ratelimited("rrpc: not able to queue bio.");
> +	rrqd = mempool_alloc(rrpc->rrq_pool, GFP_KERNEL);
> +	if (!rrqd) {
> +		pr_err_ratelimited("rrpc: not able to allocate rrqd.");
>  		bio_io_error(bio);
>  		return BLK_QC_T_NONE;
>  	}
> -	memset(rqd, 0, sizeof(struct nvm_rq));

It seems like we don't use nvm_rq in the write patch? (the writer thread
will do the write itself). Move this inside read? and use the original
nvm_rq for submission?

> +	kref_init(&rrqd->refs);
>  
> -	err = rrpc_submit_io(rrpc, bio, rqd, NVM_IOTYPE_NONE);
> +	err = rrpc_submit_io(rrpc, bio, rrqd, NVM_IOTYPE_NONE);
>  	switch (err) {
>  	case NVM_IO_OK:
>  		return BLK_QC_T_NONE;
> @@ -932,10 +1179,221 @@ static blk_qc_t rrpc_make_rq(struct request_queue *q, struct bio *bio)
>  		break;
>  	}
>  
> -	mempool_free(rqd, rrpc->rq_pool);
>  	return BLK_QC_T_NONE;
>  }
>  
> +static int rrpc_alloc_page_in_bio(struct rrpc *rrpc, struct bio *bio,
> +								void *data)
> +{
> +	struct page *page;
> +	int err;
> +
> +	if (PAGE_SIZE != RRPC_EXPOSED_PAGE_SIZE)
> +		return -1;

return -EINVAL?

This doesn't work on power and sparch architectures (with larger
PAGE_SIZEs). Would be great to handle that as well.

> +
> +	page = virt_to_page(data);
> +	if (!page) {
> +		pr_err("nvm: rrpc: could not alloc page\n");
> +		return -1;
> +	}
> +
> +	err = bio_add_page(bio, page, RRPC_EXPOSED_PAGE_SIZE, 0);
> +	if (err != RRPC_EXPOSED_PAGE_SIZE) {
> +		pr_err("nvm: rrpc: could not add page to bio\n");
> +		return -1;
> +	}
> +
> +	return 0;
> +}
> +
> +static void rrpc_submit_write(struct work_struct *work)
> +{
> +	struct rrpc_lun *rlun = container_of(work, struct rrpc_lun, ws_writer);
> +	struct rrpc *rrpc = rlun->rrpc;
> +	struct nvm_dev *dev = rrpc->dev;
> +	struct rrpc_addr *addr;
> +	struct rrpc_rq *rrqd;
> +	struct rrpc_buf_rq *brrqd;
> +	void *data;
> +	struct nvm_rq *rqd;
> +	struct rrpc_block *rblk;
> +	struct bio *bio;
> +	int pgs_to_sync, pgs_avail;
> +	int sync = NVM_SYNC_HARD;
> +	int err;
> +	int i;
> +
> +	/* Note that OS pages are typically mapped to flash page sectors, which
> +	 * are 4K; a flash page might be formed of several sectors. Also,
> +	 * controllers typically program flash pages across multiple planes.
> +	 * This is the flash programing granurality, and the reason behind the
> +	 * sync strategy performed in this write thread.
> +	 */
> +try:
> +	spin_lock(&rlun->parent->lock);
> +	list_for_each_entry(rblk, &rlun->open_list, list) {
> +		if (!spin_trylock(&rblk->w_buf.w_lock))
> +			continue;
> +
> +		/* If the write thread has already submitted all I/Os in the
> +		 * write buffer for this block ignore that the block is in the
> +		 * open list; it is on its way to the closed list. This enables
> +		 * us to avoid taking a lock on the list.
> +		 */
> +		if (unlikely(rblk->w_buf.cur_subm == rblk->w_buf.nentries)) {
> +			spin_unlock(&rblk->w_buf.w_lock);
> +			spin_unlock(&rlun->parent->lock);
> +			schedule();
> +			goto try;
> +		}
> +		pgs_avail = rblk->w_buf.cur_mem - rblk->w_buf.cur_subm;
> +
> +		switch (sync) {
> +		case NVM_SYNC_SOFT:
> +			pgs_to_sync = (pgs_avail >= rrpc->max_write_pgs) ?
> +					rrpc->max_write_pgs : 0;
> +			break;
> +		case NVM_SYNC_HARD:
> +			if (pgs_avail >= rrpc->max_write_pgs)
> +				pgs_to_sync = rrpc->max_write_pgs;
> +			else if (pgs_avail >= rrpc->min_write_pgs)
> +				pgs_to_sync = rrpc->min_write_pgs *
> +					(pgs_avail / rrpc->min_write_pgs);
> +			else
> +				pgs_to_sync = pgs_avail; /* TODO: ADD PADDING */
> +			break;
> +		case NVM_SYNC_OPORT:
> +			if (pgs_avail >= rrpc->max_write_pgs)
> +				pgs_to_sync = rrpc->max_write_pgs;
> +			else if (pgs_avail >= rrpc->min_write_pgs)
> +				pgs_to_sync = rrpc->min_write_pgs *
> +					(pgs_avail / rrpc->min_write_pgs);
> +			else
> +				pgs_to_sync = 0;
> +		}
> +
> +		if (pgs_to_sync == 0) {
> +			spin_unlock(&rblk->w_buf.w_lock);
> +			continue;
> +		}
> +
> +		bio = bio_alloc(GFP_ATOMIC, pgs_to_sync);
> +		if (!bio) {
> +			pr_err("nvm: rrpc: could not alloc write bio\n");
> +			goto out1;
> +		}
> +
> +		rqd = mempool_alloc(rrpc->rq_pool, GFP_ATOMIC);
> +		if (!rqd) {
> +			pr_err_ratelimited("rrpc: not able to create w req.");
> +			goto out2;
> +		}
> +		rqd->metadata = NULL;
> +
> +		brrqd = mempool_alloc(rrpc->m_rrq_pool, GFP_ATOMIC);
> +		if (!brrqd) {
> +			pr_err_ratelimited("rrpc: not able to create w rea.");
> +			goto out3;
> +		}
> +
> +		bio->bi_iter.bi_sector = 0; /* artificial bio */
> +		bio->bi_rw = WRITE;
> +
> +		rqd->opcode = NVM_OP_HBWRITE;
> +		rqd->bio = bio;
> +		rqd->ins = &rrpc->instance;
> +		rqd->nr_pages = pgs_to_sync;
> +		rqd->priv = brrqd;
> +
> +		if (pgs_to_sync == 1) {
> +			rrqd = rblk->w_buf.subm->rrqd;
> +			addr = rblk->w_buf.subm->addr;
> +			rqd->flags = rblk->w_buf.subm->flags;
> +			data = rblk->w_buf.subm->data;
> +
> +			err = rrpc_alloc_page_in_bio(rrpc, bio, data);
> +			if (err) {
> +				pr_err("rrpc: cannot allocate page in bio\n");
> +				goto out4;
> +			}
> +
> +			/* TODO: This address can be skipped */
> +			if (addr->addr == ADDR_EMPTY)
> +				pr_err_ratelimited("rrpc: submitting empty rq");
> +
> +			rqd->ppa_addr = rrpc_ppa_to_gaddr(dev, addr->addr);
> +
> +			brrqd[0].rrqd = rrqd;
> +			brrqd[0].addr = addr;
> +
> +			rblk->w_buf.subm++;
> +			rblk->w_buf.cur_subm++;
> +
> +			goto submit_io;
> +		}
> +
> +		/* This bio will contain several pppas */
> +		rqd->ppa_list = nvm_dev_dma_alloc(rrpc->dev, GFP_ATOMIC,
> +							&rqd->dma_ppa_list);
> +		if (!rqd->ppa_list) {
> +			pr_err("rrpc: not able to allocate ppa list\n");
> +			goto out4;
> +		}
> +
> +		for (i = 0; i < pgs_to_sync; i++) {
> +			rrqd = rblk->w_buf.subm->rrqd;
> +			addr = rblk->w_buf.subm->addr;
> +			rqd->flags = rblk->w_buf.subm->flags;
> +			data = rblk->w_buf.subm->data;
> +
> +			err = rrpc_alloc_page_in_bio(rrpc, bio, data);
> +			if (err) {
> +				pr_err("rrpc: cannot allocate page in bio\n");
> +				goto out5;
> +			}
> +
> +			/* TODO: This address can be skipped */
> +			if (addr->addr == ADDR_EMPTY)
> +				pr_err_ratelimited("rrpc: submitting empty rq");
> +
> +			rqd->ppa_list[i] = rrpc_ppa_to_gaddr(dev, addr->addr);
> +
> +			brrqd[i].rrqd = rrqd;
> +			brrqd[i].addr = addr;
> +
> +			rblk->w_buf.subm++;
> +			rblk->w_buf.cur_subm++;
> +		}
> +
> +submit_io:
> +		WARN_ON(rblk->w_buf.cur_subm > rblk->w_buf.nentries);
> +
> +		spin_unlock(&rblk->w_buf.w_lock);
> +
> +		err = nvm_submit_io(dev, rqd);
> +		if (err) {
> +			pr_err("rrpc: I/O submission failed: %d\n", err);
> +			mempool_free(rqd, rrpc->rq_pool);
> +			bio_put(bio);
> +		}
> +	}
> +
> +	spin_unlock(&rlun->parent->lock);
> +	return;
> +
> +out5:
> +	nvm_dev_dma_free(rrpc->dev, rqd->ppa_list, rqd->dma_ppa_list);
> +out4:
> +	mempool_free(brrqd, rrpc->m_rrq_pool);
> +out3:
> +	mempool_free(rqd, rrpc->rq_pool);
> +out2:
> +	bio_put(bio);
> +out1:
> +	spin_unlock(&rblk->w_buf.w_lock);
> +	spin_unlock(&rlun->parent->lock);
> +}
> +

The function can at least be split up in two parts. The what to sync and
the actual write. The ret* labels should be replaced with more sane names.

>  static void rrpc_requeue(struct work_struct *work)
>  {
>  	struct rrpc *rrpc = container_of(work, struct rrpc, ws_requeue);
> @@ -978,7 +1436,7 @@ static void rrpc_gc_free(struct rrpc *rrpc)
>  
>  static int rrpc_gc_init(struct rrpc *rrpc)
>  {
> -	rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM|WQ_UNBOUND,
> +	rrpc->krqd_wq = alloc_workqueue("rrpc-lun", WQ_MEM_RECLAIM | WQ_UNBOUND,

No need for reformatting

>  								rrpc->nr_luns);
>  	if (!rrpc->krqd_wq)
>  		return -ENOMEM;
> @@ -1080,6 +1538,8 @@ static int rrpc_map_init(struct rrpc *rrpc)
>  
>  static int rrpc_core_init(struct rrpc *rrpc)
>  {
> +	struct nvm_dev *dev = rrpc->dev;
> +
>  	down_write(&rrpc_lock);
>  	if (!rrpc_gcb_cache) {
>  		rrpc_gcb_cache = kmem_cache_create("rrpc_gcb",
> @@ -1089,14 +1549,70 @@ static int rrpc_core_init(struct rrpc *rrpc)
>  			return -ENOMEM;
>  		}
>  
> -		rrpc_rq_cache = kmem_cache_create("rrpc_rq",
> -				sizeof(struct nvm_rq) + sizeof(struct rrpc_rq),
> -				0, 0, NULL);
> +		rrpc_rq_cache = kmem_cache_create("nvm_rq",
> +					sizeof(struct nvm_rq), 0, 0, NULL);
>  		if (!rrpc_rq_cache) {
>  			kmem_cache_destroy(rrpc_gcb_cache);
>  			up_write(&rrpc_lock);
>  			return -ENOMEM;
>  		}
> +
> +		rrpc_rrq_cache = kmem_cache_create("rrpc_rrq",
> +					sizeof(struct rrpc_rq), 0, 0, NULL);
> +		if (!rrpc_rrq_cache) {
> +			kmem_cache_destroy(rrpc_gcb_cache);
> +			kmem_cache_destroy(rrpc_rq_cache);
> +			up_write(&rrpc_lock);
> +			return -ENOMEM;
> +		}
> +
> +		rrpc_buf_rrq_cache = kmem_cache_create("rrpc_m_rrq",
> +			rrpc->max_write_pgs * sizeof(struct rrpc_buf_rq),
> +			0, 0, NULL);
> +		if (!rrpc_buf_rrq_cache) {
> +			kmem_cache_destroy(rrpc_gcb_cache);
> +			kmem_cache_destroy(rrpc_rq_cache);
> +			kmem_cache_destroy(rrpc_rrq_cache);
> +			up_write(&rrpc_lock);
> +			return -ENOMEM;
> +		}
> +	}
> +
> +	/* we assume that sec->sec_size is the same as the page size exposed by
> +	 * rrpc (4KB). We need extra logic otherwise
> +	 */
> +	if (!rrpc_block_cache) {
> +		/* Write buffer: Allocate all buffer (for all block) at once. We
> +		 * avoid having to allocate a memory from the pool for each IO
> +		 * at the cost pre-allocating memory for the whole block when a
> +		 * new block is allocated from the media manager.
> +		 */
> +		rrpc_wb_cache = kmem_cache_create("nvm_wb",
> +			dev->pgs_per_blk * dev->sec_per_pg * dev->sec_size,
> +			0, 0, NULL);

Seems a bit excessive to allocate a mempool for this. Why not do a
vmalloc for when the memory is needed? The normal case is 4MB? per
entry, and the cache creates 8 of them in the pool, using 64MB out of
the gate. We are not in a hot path at this point in time, so lets not
waste the resource when we might not use them.

> +		if (!rrpc_wb_cache) {
> +			kmem_cache_destroy(rrpc_gcb_cache);
> +			kmem_cache_destroy(rrpc_rq_cache);
> +			kmem_cache_destroy(rrpc_rrq_cache);
> +			kmem_cache_destroy(rrpc_buf_rrq_cache);
> +			up_write(&rrpc_lock);
> +			return -ENOMEM;
> +		}
> +
> +		/* Write buffer entries */
> +		rrpc_block_cache = kmem_cache_create("nvm_entry",
> +			dev->pgs_per_blk * dev->sec_per_pg *
> +			sizeof(struct buf_entry),
> +			0, 0, NULL);
> +		if (!rrpc_block_cache) {
> +			kmem_cache_destroy(rrpc_gcb_cache);
> +			kmem_cache_destroy(rrpc_rq_cache);
> +			kmem_cache_destroy(rrpc_rrq_cache);
> +			kmem_cache_destroy(rrpc_buf_rrq_cache);
> +			kmem_cache_destroy(rrpc_wb_cache);
> +			up_write(&rrpc_lock);
> +			return -ENOMEM;
> +		}
>  	}
>  	up_write(&rrpc_lock);
>  
> @@ -1113,17 +1629,45 @@ static int rrpc_core_init(struct rrpc *rrpc)
>  	if (!rrpc->rq_pool)
>  		return -ENOMEM;
>  
> +	rrpc->rrq_pool = mempool_create_slab_pool(64, rrpc_rrq_cache);
> +	if (!rrpc->rrq_pool)
> +		return -ENOMEM;
> +
> +	rrpc->m_rrq_pool = mempool_create_slab_pool(64, rrpc_buf_rrq_cache);
> +	if (!rrpc->m_rrq_pool)
> +		return -ENOMEM;
> +
> +	rrpc->block_pool = mempool_create_slab_pool(8, rrpc_block_cache);
> +	if (!rrpc->block_pool)
> +		return -ENOMEM;
> +
> +	rrpc->write_buf_pool = mempool_create_slab_pool(8, rrpc_wb_cache);
> +	if (!rrpc->write_buf_pool)
> +		return -ENOMEM;

Alot of pools. I hope the nvm_rq, rrpc_rq, and rrpc_buf_rq consolidation
can move it back to just a nvm_rq entry)

> +
>  	spin_lock_init(&rrpc->inflights.lock);
>  	INIT_LIST_HEAD(&rrpc->inflights.reqs);
>  
> +	rrpc->kw_wq = alloc_workqueue("rrpc-writer",
> +				WQ_MEM_RECLAIM | WQ_UNBOUND, rrpc->nr_luns);
> +	if (!rrpc->kw_wq)
> +		return -ENOMEM;
> +
>  	return 0;
>  }
>  
>  static void rrpc_core_free(struct rrpc *rrpc)
>  {
> +	if (rrpc->kw_wq)
> +		destroy_workqueue(rrpc->kw_wq);
> +
>  	mempool_destroy(rrpc->page_pool);
>  	mempool_destroy(rrpc->gcb_pool);
> +	mempool_destroy(rrpc->m_rrq_pool);
> +	mempool_destroy(rrpc->rrq_pool);
>  	mempool_destroy(rrpc->rq_pool);
> +	mempool_destroy(rrpc->block_pool);
> +	mempool_destroy(rrpc->write_buf_pool);
>  }
>  
>  static void rrpc_luns_free(struct rrpc *rrpc)
> @@ -1164,6 +1708,8 @@ static int rrpc_luns_init(struct rrpc *rrpc, int lun_begin, int lun_end)
>  		INIT_LIST_HEAD(&rlun->open_list);
>  		INIT_LIST_HEAD(&rlun->closed_list);
>  
> +		INIT_WORK(&rlun->ws_writer, rrpc_submit_write);
> +
>  		INIT_WORK(&rlun->ws_gc, rrpc_lun_gc);
>  		spin_lock_init(&rlun->lock);
>  
> @@ -1209,6 +1755,7 @@ static void rrpc_exit(void *private)
>  
>  	flush_workqueue(rrpc->krqd_wq);
>  	flush_workqueue(rrpc->kgc_wq);
> +	/* flush_workqueue(rrpc->kw_wq); */ /* TODO: Implement flush + padding*/

This can come in an additional patch.

>  
>  	rrpc_free(rrpc);
>  }
> diff --git a/drivers/lightnvm/rrpc.h b/drivers/lightnvm/rrpc.h
> index 868e91a..6e188b4 100644
> --- a/drivers/lightnvm/rrpc.h
> +++ b/drivers/lightnvm/rrpc.h
> @@ -49,17 +49,67 @@ struct rrpc_inflight_rq {
>  struct rrpc_rq {
>  	struct rrpc_inflight_rq inflight_rq;
>  	struct rrpc_addr *addr;
> +	int nr_pages;
> +
> +	struct kref refs;

I think what you really want to do here is to add the ref counter to
rrpc_inflight_addr and then have that maintain it to when to free.
(also, name it kref)

> +};
> +
> +struct rrpc_buf_rq {
> +	struct rrpc_addr *addr;
> +	struct rrpc_rq *rrqd;

I'm not sure why we keep a rrpc_buf_rq->addr and rrpc_rq->addr?

> +};
> +
> +/* Sync strategies from write buffer to media */
> +enum {
> +	NVM_SYNC_SOFT	= 0x0,		/* Only submit at max_write_pgs
> +					 * supported by the device. Typically 64
> +					 * pages (256k).
> +					 */
> +	NVM_SYNC_HARD	= 0x1,		/* Submit the whole buffer. Add padding
> +					 * if necessary to respect the device's
> +					 * min_write_pgs.
> +					 */
> +	NVM_SYNC_OPORT	= 0x2,		/* Submit what we can, always respecting
> +					 * the device's min_write_pgs.
> +					 */

That is great. This should properly be exposed as a sysfs option later.
> +};
> +
> +struct buf_entry {
> +	struct rrpc_rq *rrqd;
> +	void *data;
> +	struct rrpc_addr *addr;
>  	unsigned long flags;
>  };
>  
> +struct rrpc_w_buf {
> +	struct buf_entry *entries;	/* Entries */
> +	struct buf_entry *mem;		/* Points to the next writable entry */
> +	struct buf_entry *subm;		/* Points to the last submitted entry */
> +	int cur_mem;			/* Current memory entry. Follows mem */
> +	int cur_subm;			/* Entries have been submitted to dev */
> +	int nentries;			/* Number of entries in write buffer */

nr_entries?

> +
> +	void *data;			/* Actual data */
> +	unsigned long *sync_bitmap;	/* Bitmap representing physical
> +					 * addresses that have been synced to
> +					 * the media
> +					 */
> +
> +	atomic_t refs;

kref? semaphore?

> +
> +	spinlock_t w_lock;
> +	spinlock_t s_lock;

is it short for submission lock?

> +};
> +
>  struct rrpc_block {
>  	struct nvm_block *parent;
>  	struct rrpc_lun *rlun;
>  	struct list_head prio;
>  	struct list_head list;
> +	struct rrpc_w_buf w_buf;
>  
>  #define MAX_INVALID_PAGES_STORAGE 8
> -	/* Bitmap for invalid page intries */
> +	/* Bitmap for invalid page entries */
>  	unsigned long invalid_pages[MAX_INVALID_PAGES_STORAGE];
>  	/* points to the next writable page within a block */
>  	unsigned int next_page;
> @@ -67,7 +117,6 @@ struct rrpc_block {
>  	unsigned int nr_invalid_pages;
>  
>  	spinlock_t lock;
> -	atomic_t data_cmnt_size; /* data pages committed to stable storage */
>  };
>  
>  struct rrpc_lun {
> @@ -86,6 +135,7 @@ struct rrpc_lun {
>  					 */
>  
>  	struct work_struct ws_gc;
> +	struct work_struct ws_writer;
>  
>  	spinlock_t lock;
>  };
> @@ -136,10 +186,15 @@ struct rrpc {
>  	mempool_t *page_pool;
>  	mempool_t *gcb_pool;
>  	mempool_t *rq_pool;
> +	mempool_t *rrq_pool;
> +	mempool_t *m_rrq_pool;
> +	mempool_t *block_pool;
> +	mempool_t *write_buf_pool;
>  
>  	struct timer_list gc_timer;
>  	struct workqueue_struct *krqd_wq;
>  	struct workqueue_struct *kgc_wq;
> +	struct workqueue_struct *kw_wq;
>  };
>  
>  struct rrpc_block_gc {
> @@ -181,7 +236,7 @@ static inline int request_intersects(struct rrpc_inflight_rq *r,
>  }
>  
>  static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
> -			     unsigned pages, struct rrpc_inflight_rq *r)
> +				unsigned pages, struct rrpc_inflight_rq *r)
>  {
>  	sector_t laddr_end = laddr + pages - 1;
>  	struct rrpc_inflight_rq *rtmp;
> @@ -206,27 +261,26 @@ static int __rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
>  }
>  
>  static inline int rrpc_lock_laddr(struct rrpc *rrpc, sector_t laddr,
> -				 unsigned pages,
> -				 struct rrpc_inflight_rq *r)
> +				unsigned pages,
> +				struct rrpc_inflight_rq *r)
>  {
>  	BUG_ON((laddr + pages) > rrpc->nr_sects);
>  
>  	return __rrpc_lock_laddr(rrpc, laddr, pages, r);
>  }
>  
> -static inline struct rrpc_inflight_rq *rrpc_get_inflight_rq(struct nvm_rq *rqd)
> +static inline struct rrpc_inflight_rq
> +				*rrpc_get_inflight_rq(struct rrpc_rq *rrqd)
>  {
> -	struct rrpc_rq *rrqd = nvm_rq_to_pdu(rqd);
> -
>  	return &rrqd->inflight_rq;
>  }
>  
>  static inline int rrpc_lock_rq(struct rrpc *rrpc, struct bio *bio,
> -							struct nvm_rq *rqd)
> +							struct rrpc_rq *rrqd)
>  {
>  	sector_t laddr = rrpc_get_laddr(bio);
>  	unsigned int pages = rrpc_get_pages(bio);
> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
>  
>  	return rrpc_lock_laddr(rrpc, laddr, pages, r);
>  }
> @@ -241,12 +295,12 @@ static inline void rrpc_unlock_laddr(struct rrpc *rrpc,
>  	spin_unlock_irqrestore(&rrpc->inflights.lock, flags);
>  }
>  
> -static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct nvm_rq *rqd)
> +static inline void rrpc_unlock_rq(struct rrpc *rrpc, struct rrpc_rq *rrqd)
>  {
> -	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rqd);
> -	uint8_t pages = rqd->nr_pages;
> +	struct rrpc_inflight_rq *r = rrpc_get_inflight_rq(rrqd);
> +	uint8_t nr_pages = rrqd->nr_pages;

The cleanups can be moved to another patch.

>  
> -	BUG_ON((r->l_start + pages) > rrpc->nr_sects);
> +	BUG_ON((r->l_start + nr_pages) > rrpc->nr_sects);
>  
>  	rrpc_unlock_laddr(rrpc, r);
>  }
> diff --git a/include/linux/lightnvm.h b/include/linux/lightnvm.h
> index b94f2d5..eda9743 100644
> --- a/include/linux/lightnvm.h
> +++ b/include/linux/lightnvm.h
> @@ -231,6 +231,8 @@ struct nvm_rq {
>  	void *metadata;
>  	dma_addr_t dma_metadata;
>  
> +	void *priv;
> +
>  	struct completion *wait;
>  	nvm_end_io_fn *end_io;
>  
> @@ -243,12 +245,12 @@ struct nvm_rq {
>  
>  static inline struct nvm_rq *nvm_rq_from_pdu(void *pdu)
>  {
> -	return pdu - sizeof(struct nvm_rq);
> +	return container_of(pdu, struct nvm_rq, priv);
>  }
>  
>  static inline void *nvm_rq_to_pdu(struct nvm_rq *rqdata)
>  {
> -	return rqdata + 1;
> +	return rqdata->priv;
>  }

I think the priv is unnecessary. This should be able to be expressed in
a single structure. E.g. have the rrpc_rq be a union of the data types
you either need when processing a read or a write. It seems that they no
longer have much in common.


--
To unsubscribe from this list: send the line "unsubscribe linux-block" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html



[Index of Archives]     [Linux RAID]     [Linux SCSI]     [Linux ATA RAID]     [IDE]     [Linux Wireless]     [Linux Kernel]     [ATH6KL]     [Linux Bluetooth]     [Linux Netdev]     [Kernel Newbies]     [Security]     [Git]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Device Mapper]

  Powered by Linux