block copy: use asynchronous notification In dm-snapshot target there may be large number of copy requests in progress. If every pending copy request consumed a process context, it would put too much load on the system. To avoid this load, we need asynchronous notification when copy finishes - we can pass a callback to the function blkdev_issue_copy, if the callback is non-NULL, blkdev_issue_copy exits when it submits all the copy bios and the callback is called when the copy operation finishes. With the callback mechanism, there can be large number of in-progress copy requests and we do not need process context for each of them. Signed-off-by: Mikulas Patocka <mpatocka@xxxxxxxxxx> --- block/blk-lib.c | 152 ++++++++++++++++++++++++++++++++-------------- block/ioctl.c | 2 include/linux/blk_types.h | 5 - include/linux/blkdev.h | 2 4 files changed, 114 insertions(+), 47 deletions(-) Index: linux-3.16-rc5/block/blk-lib.c =================================================================== --- linux-3.16-rc5.orig/block/blk-lib.c 2014-07-15 15:27:59.000000000 +0200 +++ linux-3.16-rc5/block/blk-lib.c 2014-07-15 16:16:53.000000000 +0200 @@ -305,6 +305,17 @@ int blkdev_issue_zeroout(struct block_de } EXPORT_SYMBOL(blkdev_issue_zeroout); +struct bio_copy_batch { + atomic_long_t done; + int async_error; + int sync_error; + sector_t sync_copied; + atomic64_t first_error; + void (*callback)(void *data, int error); + void *data; + sector_t *copied; +}; + #define BLK_COPY_TIMEOUT (10 * HZ) static void blk_copy_timeout(unsigned long bc_) @@ -329,6 +340,18 @@ static void blk_copy_timeout(unsigned lo bio_endio(bio1, -ETIMEDOUT); } +static void blk_copy_batch_finish(struct bio_copy_batch *batch) +{ + void (*fn)(void *, int) = batch->callback; + void *data = batch->data; + int error = unlikely(batch->sync_error) ? batch->sync_error : batch->async_error; + if (batch->copied) + *batch->copied = min(batch->sync_copied, (sector_t)atomic64_read(&batch->first_error)); + kfree(batch); + if (fn) + fn(data, error); +} + static void bio_copy_end_io(struct bio *bio, int error) { struct bio_copy *bc = bio->bi_copy; @@ -350,22 +373,22 @@ static void bio_copy_end_io(struct bio * } bio_put(bio); if (atomic_dec_and_test(&bc->in_flight)) { - struct bio_batch *bb = bc->private; + struct bio_copy_batch *batch = bc->batch; if (unlikely(bc->error < 0)) { u64 first_error; - if (!ACCESS_ONCE(bb->error)) - ACCESS_ONCE(bb->error) = bc->error; + if (!ACCESS_ONCE(batch->async_error)) + ACCESS_ONCE(batch->async_error) = bc->error; do { - first_error = atomic64_read(bc->first_error); + first_error = atomic64_read(&batch->first_error); if (bc->offset >= first_error) break; - } while (unlikely(atomic64_cmpxchg(bc->first_error, + } while (unlikely(atomic64_cmpxchg(&batch->first_error, first_error, bc->offset) != first_error)); } del_timer_sync(&bc->timer); kfree(bc); - if (atomic_dec_and_test(&bb->done)) - complete(bb->wait); + if (atomic_long_dec_and_test(&batch->done)) + blk_copy_batch_finish(batch); } } @@ -394,6 +417,18 @@ static unsigned blkdev_copy_merge(struct } } +struct bio_copy_completion { + struct completion wait; + int error; +}; + +static void bio_copy_sync_callback(void *ptr, int error) +{ + struct bio_copy_completion *comp = ptr; + comp->error = error; + complete(&comp->wait); +} + /** * blkdev_issue_copy - queue a copy same operation * @src_bdev: source blockdev @@ -408,69 +443,95 @@ static unsigned blkdev_copy_merge(struct */ int blkdev_issue_copy(struct block_device *src_bdev, sector_t src_sector, struct block_device *dst_bdev, sector_t dst_sector, - sector_t nr_sects, gfp_t gfp_mask, sector_t *copied) + sector_t nr_sects, gfp_t gfp_mask, + void (*callback)(void *, int), void *data, + sector_t *copied) { DECLARE_COMPLETION_ONSTACK(wait); struct request_queue *sq = bdev_get_queue(src_bdev); struct request_queue *dq = bdev_get_queue(dst_bdev); unsigned int max_copy_sectors; - struct bio_batch bb; - int ret = 0; - atomic64_t first_error = ATOMIC64_INIT(nr_sects); - sector_t offset = 0; + int ret; + struct bio_copy_batch *batch; + struct bio_copy_completion comp; if (copied) *copied = 0; - if (!sq || !dq) - return -ENXIO; + if (!sq || !dq) { + ret = -ENXIO; + goto end_callback; + } max_copy_sectors = min(sq->limits.max_copy_sectors, dq->limits.max_copy_sectors); - if (max_copy_sectors == 0) - return -EOPNOTSUPP; + if (max_copy_sectors == 0) { + ret = -EOPNOTSUPP; + goto end_callback; + } if (src_sector + nr_sects < src_sector || - dst_sector + nr_sects < dst_sector) - return -EINVAL; + dst_sector + nr_sects < dst_sector) { + ret = -EINVAL; + goto end_callback; + } /* Do not support overlapping copies */ if (src_bdev == dst_bdev && - abs64((u64)dst_sector - (u64)src_sector) < nr_sects) - return -EOPNOTSUPP; + abs64((u64)dst_sector - (u64)src_sector) < nr_sects) { + ret = -EOPNOTSUPP; + goto end_callback; + } + + batch = kmalloc(sizeof(struct bio_copy_batch), gfp_mask); + if (!batch) { + ret = -ENOMEM; + goto end_callback; + } - atomic_set(&bb.done, 1); - bb.error = 0; - bb.wait = &wait; + batch->done = (atomic_long_t)ATOMIC_LONG_INIT(1); + batch->async_error = 0; + batch->sync_error = 0; + batch->sync_copied = 0; + batch->first_error = (atomic64_t)ATOMIC64_INIT(nr_sects); + batch->copied = copied; + if (callback) { + batch->callback = callback; + batch->data = data; + } else { + comp.wait = COMPLETION_INITIALIZER_ONSTACK(comp.wait); + batch->callback = bio_copy_sync_callback; + batch->data = ∁ + } - while (nr_sects && !ACCESS_ONCE(bb.error)) { + while (nr_sects && !ACCESS_ONCE(batch->async_error)) { struct bio *read_bio, *write_bio; struct bio_copy *bc; unsigned chunk = (unsigned)min(nr_sects, (sector_t)max_copy_sectors); chunk = blkdev_copy_merge(src_bdev, sq, READ | REQ_COPY, src_sector, chunk); if (!chunk) { - ret = -EOPNOTSUPP; + batch->sync_error = -EOPNOTSUPP; break; } chunk = blkdev_copy_merge(dst_bdev, dq, WRITE | REQ_COPY, dst_sector, chunk); if (!chunk) { - ret = -EOPNOTSUPP; + batch->sync_error = -EOPNOTSUPP; break; } bc = kmalloc(sizeof(struct bio_copy), gfp_mask); if (!bc) { - ret = -ENOMEM; + batch->sync_error = -ENOMEM; break; } read_bio = bio_alloc(gfp_mask, 1); if (!read_bio) { kfree(bc); - ret = -ENOMEM; + batch->sync_error = -ENOMEM; break; } @@ -478,7 +539,7 @@ int blkdev_issue_copy(struct block_devic if (!write_bio) { bio_put(read_bio); kfree(bc); - ret = -ENOMEM; + batch->sync_error = -ENOMEM; break; } @@ -486,9 +547,8 @@ int blkdev_issue_copy(struct block_devic bc->error = 1; bc->pair[0] = NULL; bc->pair[1] = NULL; - bc->private = &bb; - bc->first_error = &first_error; - bc->offset = offset; + bc->batch = batch; + bc->offset = batch->sync_copied; spin_lock_init(&bc->spinlock); __setup_timer(&bc->timer, blk_copy_timeout, (unsigned long)bc, TIMER_IRQSAFE); mod_timer(&bc->timer, jiffies + BLK_COPY_TIMEOUT); @@ -505,27 +565,33 @@ int blkdev_issue_copy(struct block_devic write_bio->bi_bdev = dst_bdev; write_bio->bi_copy = bc; - atomic_inc(&bb.done); + atomic_long_inc(&batch->done); submit_bio(READ | REQ_COPY, read_bio); submit_bio(WRITE | REQ_COPY, write_bio); src_sector += chunk; dst_sector += chunk; nr_sects -= chunk; - offset += chunk; + batch->sync_copied += chunk; } - /* Wait for bios in-flight */ - if (!atomic_dec_and_test(&bb.done)) - wait_for_completion_io(&wait); + if (atomic_long_dec_and_test(&batch->done)) + blk_copy_batch_finish(batch); - if (copied) - *copied = min((sector_t)atomic64_read(&first_error), offset); - - if (likely(!ret)) - ret = bb.error; + if (callback) { + return 0; + } else { + wait_for_completion_io(&comp.wait); + return comp.error; + } - return ret; +end_callback: + if (callback) { + callback(data, ret); + return 0; + } else { + return ret; + } } EXPORT_SYMBOL(blkdev_issue_copy); Index: linux-3.16-rc5/include/linux/blk_types.h =================================================================== --- linux-3.16-rc5.orig/include/linux/blk_types.h 2014-07-15 15:27:51.000000000 +0200 +++ linux-3.16-rc5/include/linux/blk_types.h 2014-07-15 15:28:46.000000000 +0200 @@ -40,6 +40,8 @@ struct bvec_iter { current bvec */ }; +struct bio_copy_batch; + struct bio_copy { /* * error == 1 - bios are waiting to be paired @@ -49,8 +51,7 @@ struct bio_copy { int error; atomic_t in_flight; struct bio *pair[2]; - void *private; - atomic64_t *first_error; + struct bio_copy_batch *batch; sector_t offset; spinlock_t spinlock; struct timer_list timer; Index: linux-3.16-rc5/include/linux/blkdev.h =================================================================== --- linux-3.16-rc5.orig/include/linux/blkdev.h 2014-07-15 15:27:49.000000000 +0200 +++ linux-3.16-rc5/include/linux/blkdev.h 2014-07-15 15:28:46.000000000 +0200 @@ -1173,7 +1173,7 @@ extern int blkdev_issue_write_same(struc sector_t nr_sects, gfp_t gfp_mask, struct page *page); extern int blkdev_issue_copy(struct block_device *, sector_t, struct block_device *, sector_t, sector_t, gfp_t, - sector_t *); + void (*)(void *, int), void *, sector_t *); extern int blkdev_issue_zeroout(struct block_device *bdev, sector_t sector, sector_t nr_sects, gfp_t gfp_mask); static inline int sb_issue_discard(struct super_block *sb, sector_t block, Index: linux-3.16-rc5/block/ioctl.c =================================================================== --- linux-3.16-rc5.orig/block/ioctl.c 2014-07-15 15:27:49.000000000 +0200 +++ linux-3.16-rc5/block/ioctl.c 2014-07-15 15:28:46.000000000 +0200 @@ -228,7 +228,7 @@ static int blk_ioctl_copy(struct block_d return -EINVAL; ret = blkdev_issue_copy(bdev, src_offset, bdev, dst_offset, len, - GFP_KERNEL, &copied_sec); + GFP_KERNEL, NULL, NULL, &copied_sec); *copied = (uint64_t)copied_sec << 9; -- To unsubscribe from this list: send the line "unsubscribe linux-scsi" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html