Hi Here I'm sending the second version of the patch that takes only read lock and that allows parallel btree searches - so there will be only two cacheline bounces per bio (on down_read and up_read). If nf == NF_GET_UNLOCKED, the patch only sets b->accessed, it doesn't relink lru or manipulate hold count, so it can be executed concurrently. Mikulas Signed-off-by: Mikulas Patocka <mpatocka@xxxxxxxxxx> --- drivers/md/dm-bufio.c | 54 ++++++++++++++++++++++++++++++++++------------- include/linux/dm-bufio.h | 14 ++++++++++++ 2 files changed, 54 insertions(+), 14 deletions(-) Index: linux-dm/drivers/md/dm-bufio.c =================================================================== --- linux-dm.orig/drivers/md/dm-bufio.c 2022-09-27 17:41:50.000000000 +0200 +++ linux-dm/drivers/md/dm-bufio.c 2022-09-27 17:58:49.000000000 +0200 @@ -81,7 +81,7 @@ * context. */ struct dm_bufio_client { - struct mutex lock; + struct rw_semaphore lock; spinlock_t spinlock; bool no_sleep; @@ -145,7 +145,7 @@ struct dm_buffer { unsigned char list_mode; /* LIST_* */ blk_status_t read_error; blk_status_t write_error; - unsigned accessed; + volatile unsigned accessed; unsigned hold_count; unsigned long state; unsigned long last_accessed; @@ -174,7 +174,7 @@ static void dm_bufio_lock(struct dm_bufi if (static_branch_unlikely(&no_sleep_enabled) && c->no_sleep) spin_lock_bh(&c->spinlock); else - mutex_lock_nested(&c->lock, dm_bufio_in_request()); + down_write_nested(&c->lock, dm_bufio_in_request()); } static int dm_bufio_trylock(struct dm_bufio_client *c) @@ -182,7 +182,7 @@ static int dm_bufio_trylock(struct dm_bu if (static_branch_unlikely(&no_sleep_enabled) && c->no_sleep) return spin_trylock_bh(&c->spinlock); else - return mutex_trylock(&c->lock); + return down_write_trylock(&c->lock); } static void dm_bufio_unlock(struct dm_bufio_client *c) @@ -190,9 +190,21 @@ static void dm_bufio_unlock(struct dm_bu if (static_branch_unlikely(&no_sleep_enabled) && c->no_sleep) spin_unlock_bh(&c->spinlock); else - mutex_unlock(&c->lock); + up_write(&c->lock); } +void dm_bufio_lock_read(struct dm_bufio_client *c) +{ + down_read(&c->lock); +} +EXPORT_SYMBOL_GPL(dm_bufio_lock_read); + +void dm_bufio_unlock_read(struct dm_bufio_client *c) +{ + up_read(&c->lock); +} +EXPORT_SYMBOL_GPL(dm_bufio_unlock_read); + /*----------------------------------------------------------------*/ /* @@ -870,7 +882,8 @@ enum new_flag { NF_FRESH = 0, NF_READ = 1, NF_GET = 2, - NF_PREFETCH = 3 + NF_GET_UNLOCKED = 3, + NF_PREFETCH = 4 }; /* @@ -1013,7 +1026,7 @@ static struct dm_buffer *__bufio_new(str if (b) goto found_buffer; - if (nf == NF_GET) + if (nf == NF_GET || nf == NF_GET_UNLOCKED) return NULL; new_b = __alloc_buffer_wait(c, nf); @@ -1058,12 +1071,17 @@ found_buffer: * If the user called both dm_bufio_prefetch and dm_bufio_get on * the same buffer, it would deadlock if we waited. */ - if (nf == NF_GET && unlikely(test_bit(B_READING, &b->state))) + if ((nf == NF_GET || nf == NF_GET_UNLOCKED) && unlikely(test_bit(B_READING, &b->state))) return NULL; - b->hold_count++; - __relink_lru(b, test_bit(B_DIRTY, &b->state) || - test_bit(B_WRITING, &b->state)); + if (nf == NF_GET_UNLOCKED) { + if (!b->accessed) + b->accessed = 1; + } else { + b->hold_count++; + __relink_lru(b, test_bit(B_DIRTY, &b->state) || + test_bit(B_WRITING, &b->state)); + } return b; } @@ -1154,6 +1172,16 @@ void *dm_bufio_new(struct dm_bufio_clien } EXPORT_SYMBOL_GPL(dm_bufio_new); +void *dm_bufio_get_unlocked(struct dm_bufio_client *c, sector_t block) +{ + int need_submit; + struct dm_buffer *b = __bufio_new(c, block, NF_GET_UNLOCKED, &need_submit, NULL); + if (b) + return b->data; + return NULL; +} +EXPORT_SYMBOL_GPL(dm_bufio_get_unlocked); + void dm_bufio_prefetch(struct dm_bufio_client *c, sector_t block, unsigned n_blocks) { @@ -1777,7 +1805,7 @@ struct dm_bufio_client *dm_bufio_client_ c->n_buffers[i] = 0; } - mutex_init(&c->lock); + init_rwsem(&c->lock); spin_lock_init(&c->spinlock); INIT_LIST_HEAD(&c->reserved_buffers); c->need_reserved_buffers = reserved_buffers; @@ -1856,7 +1884,6 @@ bad: kmem_cache_destroy(c->slab_buffer); dm_io_client_destroy(c->dm_io); bad_dm_io: - mutex_destroy(&c->lock); kfree(c); bad_client: return ERR_PTR(r); @@ -1904,7 +1931,6 @@ void dm_bufio_client_destroy(struct dm_b kmem_cache_destroy(c->slab_cache); kmem_cache_destroy(c->slab_buffer); dm_io_client_destroy(c->dm_io); - mutex_destroy(&c->lock); if (c->no_sleep) static_branch_dec(&no_sleep_enabled); kfree(c); Index: linux-dm/include/linux/dm-bufio.h =================================================================== --- linux-dm.orig/include/linux/dm-bufio.h 2022-09-27 17:41:50.000000000 +0200 +++ linux-dm/include/linux/dm-bufio.h 2022-09-27 18:01:21.000000000 +0200 @@ -38,6 +38,13 @@ dm_bufio_client_create(struct block_devi void dm_bufio_client_destroy(struct dm_bufio_client *c); /* + * Lock and unlock the bufio client - this is needed if we want to call + * dm_bufio_get_unlocked. + */ +void dm_bufio_lock_read(struct dm_bufio_client *c); +void dm_bufio_unlock_read(struct dm_bufio_client *c); + +/* * Set the sector range. * When this function is called, there must be no I/O in progress on the bufio * client. @@ -76,6 +83,13 @@ void *dm_bufio_new(struct dm_bufio_clien struct dm_buffer **bp); /* + * Like dm_bufio_get, but assume that the client is already locked with + * dm_bufio_lock_read/dm_bufio_unlock_read. dm_bufio_release should not be + * called; the caller should call dm_bufio_unlock_read to release the buffer. + */ +void *dm_bufio_get_unlocked(struct dm_bufio_client *c, sector_t block); + +/* * Prefetch the specified blocks to the cache. * The function starts to read the blocks and returns without waiting for * I/O to finish. -- dm-devel mailing list dm-devel@xxxxxxxxxx https://listman.redhat.com/mailman/listinfo/dm-devel