Hi, Mike The monolithic source code (3.2k) is nicely splitted into almost 20 *.c files according to the functionality and data strucutures in OOP style. The aim of this posting is to share how the splitting looks like. I believe that at least reading the *.h files can convince you the splitting is clear. The code is now tainted with almost 20 version switch macros and WB* debug macros but I will clean them up for sending patch. Again, the latest code can be cloned by git clone https://github.com/akiradeveloper/dm-writeboost.git I will make few updates to the source codes on this weekend so please track it to follow the latest version. Below is only the snapshot. Akira ---------- Summary ---------- 33 Makefile 10 bigarray.h 19 cache-alloc.h 10 defer-barrier.h 8 dirty-sync.h 8 flush-daemon.h 10 format-cache.h 24 handle-io.h 16 hashtable.h 18 migrate-daemon.h 7 migrate-modulator.h 12 queue-flush-job.h 8 rambuf.h 13 recover.h 18 segment.h 8 superblock-recorder.h 9 target.h 30 util.h 384 writeboost.h 99 bigarray.c 192 cache-alloc.c 36 defer-barrier.c 33 dirty-sync.c 85 flush-daemon.c 234 format-cache.c 553 handle-io.c 109 hashtable.c 345 migrate-daemon.c 41 migrate-modulator.c 169 queue-flush-job.c 52 rambuf.c 308 recover.c 118 segment.c 61 superblock-recorder.c 376 target.c 126 util.c ---------- Makefile ---------- KERNEL_TREE := /lib/modules/$(shell uname -r)/build # KERNEL_TREE := $(HOME)/linux-$(KERN_VERSION) PWD := $(shell pwd) # EXTRA_CFLAGS += -O0 -DCONFIG_DM_DEBUG -fno-inline #-Wall # EXTRA_CFLAGS += -O2 -UCONFIG_DM_DEBUG obj-m := dm-writeboost.o dm-writeboost-objs := \ target.o \ handle-io.o \ queue-flush-job.o \ flush-daemon.o \ migrate-daemon.o \ migrate-modulator.o \ defer-barrier.o \ superblock-recorder.o \ dirty-sync.o \ bigarray.o \ segment.o \ hashtable.o \ cache-alloc.o \ format-cache.o \ recover.o \ rambuf.o \ util.o all: $(MAKE) -C $(KERNEL_TREE) M=$(PWD) modules clean: $(MAKE) -C $(KERNEL_TREE) M=$(PWD) clean ---------- bigarray.h ---------- #ifndef WRITEBOOST_BIGARRAY_H #define WRITEBOOST_BIGARRAY_H #include "writeboost.h" struct bigarray; struct bigarray *make_bigarray(size_t elemsize, size_t nr_elems); void kill_bigarray(struct bigarray *); void *bigarray_at(struct bigarray *, size_t i); #endif ---------- cache-alloc.h ---------- #ifndef WRITEBOOST_CACHE_ALLOC_H #define WRITEBOOST_CACHE_ALLOC_H #include "writeboost.h" #include "segment.h" #include "flush-daemon.h" #include "migrate-daemon.h" #include "migrate-modulator.h" #include "rambuf.h" #include "hashtable.h" #include "superblock-recorder.h" #include "dirty-sync.h" #include "recover.h" #include "defer-barrier.h" #include "handle-io.h" int __must_check resume_cache(struct wb_cache *, struct dm_dev *); void free_cache(struct wb_cache *); #endif ---------- defer-barrier.h ---------- #ifndef WRITEBOOST_DEFER_BARRIER_H #define WRITEBOOST_DEFER_BARRIER_H #include "writeboost.h" #include "queue-flush-job.h" void queue_barrier_io(struct wb_cache *, struct bio *); void flush_barrier_ios(struct work_struct *); void barrier_deadline_proc(unsigned long data); #endif ---------- dirty-sync.h ---------- #ifndef WRITEBOOST_DIRTY_SYNC_H #define WRITEBOOST_DIRTY_SYNC_H #include "writeboost.h" #include "queue-flush-job.h" void sync_proc(struct work_struct *); #endif ---------- flush-daemon.h ---------- #ifndef WRITEBOOST_FLUSH_DAEMON_H #define WRITEBOOST_FLUSH_DAEMON_H #include "writeboost.h" #include "util.h" void flush_proc(struct work_struct *); #endif ---------- format-cache.h ---------- #ifndef WRITEBOOST_FORMAT_CACHE_H #define WRITEBOOST_FORMAT_CACHE_H #include "writeboost.h" #include "util.h" #include "segment.h" int __must_check audit_cache_device(struct dm_dev *, bool *cache_valid); int __must_check format_cache_device(struct dm_dev *); #endif ---------- handle-io.h ---------- #ifndef WRITEBOOST_HANDLE_IO_H #define WRITEBOOST_HANDLE_IO_H #include "writeboost.h" #include "bigarray.h" #include "util.h" #include "defer-barrier.h" #include "hashtable.h" #include "segment.h" #include "queue-flush-job.h" int writeboost_map(struct dm_target *, struct bio * #if LINUX_VERSION_CODE < PER_BIO_VERSION , union map_info * #endif ); int writeboost_end_io(struct dm_target *, struct bio *, int error #if LINUX_VERSION_CODE < PER_BIO_VERSION , union map_info * #endif ); void inc_nr_dirty_caches(struct wb_device *); void clear_stat(struct wb_cache *); #endif ---------- hashtable.h ---------- #ifndef WRITEBOOST_HASHTABLE_H #define WRITEBOOST_HASHTABLE_H #include "writeboost.h" #include "segment.h" int __must_check ht_empty_init(struct wb_cache *); cache_nr ht_hash(struct wb_cache *, struct lookup_key *); struct metablock *ht_lookup(struct wb_cache *, struct ht_head *, struct lookup_key *); void ht_register(struct wb_cache *, struct ht_head *, struct lookup_key *, struct metablock *); void ht_del(struct wb_cache *, struct metablock *); void discard_caches_inseg(struct wb_cache *, struct segment_header *); #endif ---------- migrate-daemon.h ---------- #ifndef WRITEBOOST_MIGRATE_DAEMON_H #define WRITEBOOST_MIGRATE_DAEMON_H #include "writeboost.h" #include "util.h" #include "segment.h" u8 atomic_read_mb_dirtiness(struct segment_header *, struct metablock *); void cleanup_mb_if_dirty(struct wb_cache *, struct segment_header *, struct metablock *); void migrate_proc(struct work_struct *); void wait_for_migration(struct wb_cache *, size_t id); #endif ---------- migrate-modulator.h ---------- #ifndef WRITEBOOST_MIGRATE_MODULATOR_H #define WRITEBOOST_MIGRATE_MODULATOR_H #include "writeboost.h" void modulator_proc(struct work_struct *); #endif ---------- queue-flush-job.h ---------- #ifndef WRITEBOOST_QUEUE_FLUSH_JOB #define WRITEBOOST_QUEUE_FLUSH_JOB #include "writeboost.h" #include "segment.h" #include "hashtable.h" #include "util.h" #include "migrate-daemon.h" void queue_current_buffer(struct wb_cache *); void flush_current_buffer(struct wb_cache *); #endif ---------- rambuf.h ---------- #ifndef WRITEBOOST_RAMBUF_H #define WRITEBOOST_RAMBUF_H #include "writeboost.h" int __must_check init_rambuf_pool(struct wb_cache *); void free_rambuf_pool(struct wb_cache *); #endif ---------- recover.h ---------- #ifndef WRITEBOOST_RECOVER_H #define WRITEBOOST_RECOVER_H #include "writeboost.h" #include "util.h" #include "segment.h" #include "bigarray.h" #include "hashtable.h" #include "migrate-daemon.h" #include "handle-io.h" int __must_check recover_cache(struct wb_cache *); #endif ---------- segment.h ---------- #ifndef WRITEBOOST_SEGMENT_H #define WRITEBOOST_SEGMENT_H #include "writeboost.h" #include "segment.h" #include "bigarray.h" #include "util.h" int __must_check init_segment_header_array(struct wb_cache *); u64 calc_nr_segments(struct dm_dev *); struct segment_header *get_segment_header_by_id(struct wb_cache *, size_t segment_id); sector_t calc_segment_header_start(size_t segment_idx); sector_t calc_mb_start_sector(struct segment_header *, cache_nr mb_idx); u32 calc_segment_lap(struct wb_cache *, size_t segment_id); struct metablock *mb_at(struct wb_cache *, cache_nr idx); bool is_on_buffer(struct wb_cache *, cache_nr mb_idx); #endif ---------- superblock-recorder.h ---------- #ifndef WRITEBOOST_SUPERBLOCK_RECORDER_H #define WRITEBOOST_SUPERBLOCK_RECORDER_H #include "writeboost.h" #include "util.h" void recorder_proc(struct work_struct *); #endif ---------- target.h ---------- #ifndef WRITEBOOST_TARGET_H #define WRITEBOOST_TARGET_H #include "writeboost.h" #include "format-cache.h" #include "cache-alloc.h" #include "handle-io.h" #include "util.h" #endif ---------- util.h ---------- #ifndef WRITEBOOST_UTIL_H #define WRITEBOOST_UTIL_H #include "writeboost.h" extern struct workqueue_struct *safe_io_wq; extern struct dm_io_client *wb_io_client; void *do_kmalloc_retry(size_t size, gfp_t flags, int lineno); #define kmalloc_retry(size, flags) \ do_kmalloc_retry((size), (flags), __LINE__) int dm_safe_io_internal( struct dm_io_request *, unsigned num_regions, struct dm_io_region *, unsigned long *err_bits, bool thread, int lineno); #define dm_safe_io(io_req, num_regions, regions, err_bits, thread) \ dm_safe_io_internal((io_req), (num_regions), (regions), \ (err_bits), (thread), __LINE__) void dm_safe_io_retry_internal( struct dm_io_request *, unsigned num_regions, struct dm_io_region *, bool thread, int lineno); #define dm_safe_io_retry(io_req, num_regions, regions, thread) \ dm_safe_io_retry_internal((io_req), (num_regions), (regions), \ (thread), __LINE__) sector_t dm_devsize(struct dm_dev *); #endif ---------- writeboost.h ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #ifndef DM_WRITEBOOST_H #define DM_WRITEBOOST_H #define DM_MSG_PREFIX "writeboost" #include <linux/module.h> #include <linux/version.h> #include <linux/list.h> #include <linux/slab.h> #include <linux/mutex.h> #include <linux/sched.h> #include <linux/timer.h> #include <linux/device-mapper.h> #include <linux/dm-io.h> #define WBERR(f, args...) \ DMERR("err@%d " f, __LINE__, ## args) #define WBWARN(f, args...) \ DMWARN("warn@%d " f, __LINE__, ## args) #define WBINFO(f, args...) \ DMINFO("info@%d " f, __LINE__, ## args) /* * (1 << x) sector. * 4 <= x <= 11 * dm-writeboost supports segment size up to 1MB. * * All the comments are if * the segment size is the maximum 1MB. */ #define WB_SEGMENTSIZE_ORDER 11 /* * By default, * we allocate 64 * 1MB RAM buffers statically. */ #define NR_RAMBUF_POOL 64 /* * The first 4KB (1<<3 sectors) in segment * is for metadata. */ #define NR_CACHES_INSEG ((1 << (WB_SEGMENTSIZE_ORDER - 3)) - 1) /* * The Detail of the Disk Format * * Whole: * Superblock(1MB) Segment(1MB) Segment(1MB) ... * We reserve the first segment (1MB) as the superblock. * * Superblock(1MB): * head <---- ----> tail * superblock header(512B) ... superblock record(512B) * * Segment(1MB): * segment_header_device(4KB) metablock_device(4KB) * NR_CACHES_INSEG */ /* * Superblock Header * First one sector of the super block region. * The value is fixed after formatted. */ /* * Magic Number * "WBst" */ #define WRITEBOOST_MAGIC 0x57427374 struct superblock_header_device { __le32 magic; } __packed; /* * Superblock Record (Mutable) * Last one sector of the superblock region. * Record the current cache status in need. */ struct superblock_record_device { __le64 last_migrated_segment_id; } __packed; /* * Cache line index. * * dm-writeboost can supoort a cache device * with size less than 4KB * (1 << 32) * that is 16TB. */ typedef u32 cache_nr; /* * Metadata of a 4KB cache line * * Dirtiness is defined for each sector * in this cache line. */ struct metablock { sector_t sector; /* key */ cache_nr idx; /* Const */ struct hlist_node ht_list; /* * 8 bit flag for dirtiness * for each sector in cache line. * * Current implementation * only recovers dirty caches. * Recovering clean caches complicates the code * but couldn't be effective * since only few of the caches are clean. */ u8 dirty_bits; }; /* * On-disk metablock */ struct metablock_device { __le64 sector; u8 dirty_bits; __le32 lap; } __packed; #define SZ_MAX (~(size_t)0) struct segment_header { struct metablock mb_array[NR_CACHES_INSEG]; /* * ID uniformly increases. * ID 0 is used to tell that the segment is invalid * and valid id >= 1. */ u64 global_id; /* * Segment can be flushed half-done. * length is the number of * metablocks that must be counted in * in resuming. */ u8 length; cache_nr start_idx; /* Const */ sector_t start_sector; /* Const */ struct list_head migrate_list; /* * This segment can not be migrated * to backin store * until flushed. * Flushed segment is in cache device. */ struct completion flush_done; /* * This segment can not be overwritten * until migrated. */ struct completion migrate_done; spinlock_t lock; atomic_t nr_inflight_ios; }; /* * (Locking) * Locking metablocks by their granularity * needs too much memory space for lock structures. * We only locks a metablock by locking the parent segment * that includes the metablock. */ #define lockseg(seg, flags) spin_lock_irqsave(&(seg)->lock, flags) #define unlockseg(seg, flags) spin_unlock_irqrestore(&(seg)->lock, flags) /* * On-disk segment header. * * Must be at most 4KB large. */ struct segment_header_device { /* - FROM - At most512 byte for atomicity. --- */ __le64 global_id; /* * How many cache lines in this segments * should be counted in resuming. */ u8 length; /* * On what lap in rorating on cache device * used to find the head and tail in the * segments in cache device. */ __le32 lap; /* - TO -------------------------------------- */ /* This array must locate at the tail */ struct metablock_device mbarr[NR_CACHES_INSEG]; } __packed; struct rambuffer { void *data; struct completion done; }; enum STATFLAG { STAT_WRITE = 0, STAT_HIT, STAT_ON_BUFFER, STAT_FULLSIZE, }; #define STATLEN (1 << 4) struct lookup_key { sector_t sector; }; struct ht_head { struct hlist_head ht_list; }; struct wb_device; struct wb_cache { struct wb_device *wb; struct dm_dev *device; struct mutex io_lock; cache_nr nr_caches; /* Const */ u64 nr_segments; /* Const */ struct bigarray *segment_header_array; /* * Chained hashtable * * Writeboost uses chained hashtable * to cache lookup. * Cache discarding often happedns * This structure fits our needs. */ struct bigarray *htable; size_t htsize; struct ht_head *null_head; cache_nr cursor; /* Index that has been written the most lately */ struct segment_header *current_seg; struct rambuffer *current_rambuf; struct rambuffer *rambuf_pool; u64 last_migrated_segment_id; u64 last_flushed_segment_id; u64 reserving_segment_id; /* * Flush daemon * * Writeboost first queue the segment to flush * and flush daemon asynchronously * flush them to the cache device. */ struct work_struct flush_work; struct workqueue_struct *flush_wq; spinlock_t flush_queue_lock; struct list_head flush_queue; wait_queue_head_t flush_wait_queue; /* * Deferred ACK for barriers. */ struct work_struct barrier_deadline_work; struct timer_list barrier_deadline_timer; struct bio_list barrier_ios; unsigned long barrier_deadline_ms; /* param */ /* * Migration daemon * * Migartion also works in background. * * If allow_migrate is true, * migrate daemon goes into migration * if they are segments to migrate. */ struct work_struct migrate_work; struct workqueue_struct *migrate_wq; bool allow_migrate; /* param */ /* * Batched Migration * * Migration is done atomically * with number of segments batched. */ wait_queue_head_t migrate_wait_queue; atomic_t migrate_fail_count; atomic_t migrate_io_count; struct list_head migrate_list; u8 *dirtiness_snapshot; void *migrate_buffer; size_t nr_cur_batched_migration; size_t nr_max_batched_migration; /* param */ /* * Migration modulator * * This daemon turns on and off * the migration * according to the load of backing store. */ struct work_struct modulator_work; bool enable_migration_modulator; /* param */ /* * Superblock Recorder * * Update the superblock record * periodically. */ struct work_struct recorder_work; unsigned long update_record_interval; /* param */ /* * Cache Synchronizer * * Sync the dirty writes * periodically. */ struct work_struct sync_work; unsigned long sync_interval; /* param */ /* * on_terminate is true * to notify all the background daemons to * stop their operations. */ bool on_terminate; atomic64_t stat[STATLEN]; }; struct wb_device { struct dm_target *ti; struct dm_dev *device; struct wb_cache *cache; u8 migrate_threshold; atomic64_t nr_dirty_caches; }; struct flush_job { struct list_head flush_queue; struct segment_header *seg; /* * The data to flush to cache device. */ struct rambuffer *rambuf; /* * List of bios with barrier flags. */ struct bio_list barrier_ios; }; #define PER_BIO_VERSION KERNEL_VERSION(3, 8, 0) #if LINUX_VERSION_CODE >= PER_BIO_VERSION struct per_bio_data { void *ptr; }; #endif #endif ---------- bigarray.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ /* * A array like structure * that can contain million of elements. * The aim of this class is the same as * flex_array. * The reason we don't use flex_array is * that the class trades the performance * to get the resizability. * struct arr is fast and light-weighted. */ #include "bigarray.h" struct part { void *memory; }; struct bigarray { struct part *parts; size_t nr_elems; size_t elemsize; }; #define ALLOC_SIZE (1 << 16) static size_t nr_elems_in_part(struct bigarray *arr) { return ALLOC_SIZE / arr->elemsize; }; static size_t nr_parts(struct bigarray *arr) { return dm_div_up(arr->nr_elems, nr_elems_in_part(arr)); } struct bigarray *make_bigarray(size_t elemsize, size_t nr_elems) { size_t i, j; struct part *part; struct bigarray *arr = kmalloc(sizeof(*arr), GFP_KERNEL); if (!arr) { WBERR(); return NULL; } arr->elemsize = elemsize; arr->nr_elems = nr_elems; arr->parts = kmalloc(sizeof(struct part) * nr_parts(arr), GFP_KERNEL); if (!arr->parts) { WBERR(); goto bad_alloc_parts; } for (i = 0; i < nr_parts(arr); i++) { part = arr->parts + i; part->memory = kmalloc(ALLOC_SIZE, GFP_KERNEL); if (!part->memory) { WBERR(); for (j = 0; j < i; j++) { part = arr->parts + j; kfree(part->memory); } goto bad_alloc_parts_memory; } } return arr; bad_alloc_parts_memory: kfree(arr->parts); bad_alloc_parts: kfree(arr); return NULL; } void kill_bigarray(struct bigarray *arr) { size_t i; for (i = 0; i < nr_parts(arr); i++) { struct part *part = arr->parts + i; kfree(part->memory); } kfree(arr->parts); kfree(arr); } void *bigarray_at(struct bigarray *arr, size_t i) { size_t n = nr_elems_in_part(arr); size_t j = i / n; size_t k = i % n; struct part *part = arr->parts + j; return part->memory + (arr->elemsize * k); } ---------- cache-alloc.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ /* * Cache resume/free operations are provided. * Resuming a cache is to construct in-core * metadata structures from the metadata * region in the cache device. */ #include "cache-alloc.h" int __must_check resume_cache(struct wb_cache *cache, struct dm_dev *dev) { int r = 0; cache->device = dev; cache->nr_segments = calc_nr_segments(cache->device); cache->nr_caches = cache->nr_segments * NR_CACHES_INSEG; cache->on_terminate = false; cache->allow_migrate = true; cache->reserving_segment_id = 0; mutex_init(&cache->io_lock); cache->enable_migration_modulator = true; cache->update_record_interval = 60; cache->sync_interval = 60; r = init_rambuf_pool(cache); if (r) { WBERR(); goto bad_init_rambuf_pool; } /* * Select arbitrary one as the initial rambuffer. */ cache->current_rambuf = cache->rambuf_pool + 0; r = init_segment_header_array(cache); if (r) { WBERR(); goto bad_alloc_segment_header_array; } r = ht_empty_init(cache); if (r) { WBERR(); goto bad_alloc_ht; } /* * All in-core structures are allocated and * initialized. * Next, read metadata from the cache device. */ r = recover_cache(cache); if (r) { WBERR(); goto bad_recover; } /* Data structures for Migration */ cache->migrate_buffer = vmalloc(NR_CACHES_INSEG << 12); if (!cache->migrate_buffer) { WBERR(); goto bad_alloc_migrate_buffer; } cache->dirtiness_snapshot = kmalloc( NR_CACHES_INSEG, GFP_KERNEL); if (!cache->dirtiness_snapshot) { WBERR(); goto bad_alloc_dirtiness_snapshot; } cache->migrate_wq = create_singlethread_workqueue("migratewq"); if (!cache->migrate_wq) { WBERR(); goto bad_migratewq; } cache->flush_wq = create_singlethread_workqueue("flushwq"); if (!cache->flush_wq) { WBERR(); goto bad_flushwq; } /* Migration Daemon */ INIT_WORK(&cache->migrate_work, migrate_proc); init_waitqueue_head(&cache->migrate_wait_queue); INIT_LIST_HEAD(&cache->migrate_list); atomic_set(&cache->migrate_fail_count, 0); atomic_set(&cache->migrate_io_count, 0); cache->nr_max_batched_migration = 1; cache->nr_cur_batched_migration = 1; queue_work(cache->migrate_wq, &cache->migrate_work); /* Deferred ACK for barrier writes */ setup_timer(&cache->barrier_deadline_timer, barrier_deadline_proc, (unsigned long) cache); bio_list_init(&cache->barrier_ios); /* * Deadline is 3 ms by default. * 2.5 us to process on bio * and 3 ms is enough long to process 255 bios. * If the buffer doesn't get full within 3 ms, * we can doubt write starves * by waiting formerly submitted barrier to be complete. */ cache->barrier_deadline_ms = 3; INIT_WORK(&cache->barrier_deadline_work, flush_barrier_ios); /* Flush Daemon */ INIT_WORK(&cache->flush_work, flush_proc); spin_lock_init(&cache->flush_queue_lock); INIT_LIST_HEAD(&cache->flush_queue); init_waitqueue_head(&cache->flush_wait_queue); queue_work(cache->flush_wq, &cache->flush_work); /* Migartion Modulator */ INIT_WORK(&cache->modulator_work, modulator_proc); schedule_work(&cache->modulator_work); /* Superblock Recorder */ INIT_WORK(&cache->recorder_work, recorder_proc); schedule_work(&cache->recorder_work); /* Dirty Synchronizer */ INIT_WORK(&cache->sync_work, sync_proc); schedule_work(&cache->sync_work); clear_stat(cache); return 0; bad_flushwq: destroy_workqueue(cache->migrate_wq); bad_migratewq: kfree(cache->dirtiness_snapshot); bad_alloc_dirtiness_snapshot: vfree(cache->migrate_buffer); bad_alloc_migrate_buffer: bad_recover: kill_bigarray(cache->htable); bad_alloc_ht: kill_bigarray(cache->segment_header_array); bad_alloc_segment_header_array: free_rambuf_pool(cache); bad_init_rambuf_pool: kfree(cache); return r; } void free_cache(struct wb_cache *cache) { cache->on_terminate = true; /* Kill in-kernel daemons */ cancel_work_sync(&cache->sync_work); cancel_work_sync(&cache->recorder_work); cancel_work_sync(&cache->modulator_work); cancel_work_sync(&cache->flush_work); destroy_workqueue(cache->flush_wq); cancel_work_sync(&cache->barrier_deadline_work); cancel_work_sync(&cache->migrate_work); destroy_workqueue(cache->migrate_wq); kfree(cache->dirtiness_snapshot); vfree(cache->migrate_buffer); /* Destroy in-core structures */ kill_bigarray(cache->htable); kill_bigarray(cache->segment_header_array); free_rambuf_pool(cache); } ---------- defer-barrier.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "defer-barrier.h" void queue_barrier_io(struct wb_cache *cache, struct bio *bio) { mutex_lock(&cache->io_lock); bio_list_add(&cache->barrier_ios, bio); mutex_unlock(&cache->io_lock); if (!timer_pending(&cache->barrier_deadline_timer)) mod_timer(&cache->barrier_deadline_timer, msecs_to_jiffies(cache->barrier_deadline_ms)); } void barrier_deadline_proc(unsigned long data) { struct wb_cache *cache = (struct wb_cache *) data; schedule_work(&cache->barrier_deadline_work); } void flush_barrier_ios(struct work_struct *work) { struct wb_cache *cache = container_of(work, struct wb_cache, barrier_deadline_work); if (bio_list_empty(&cache->barrier_ios)) return; flush_current_buffer(cache); } ---------- dirty-sync.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "dirty-sync.h" void sync_proc(struct work_struct *work) { struct wb_cache *cache = container_of(work, struct wb_cache, sync_work); unsigned long intvl; while (true) { if (cache->on_terminate) return; /* sec -> ms */ intvl = cache->sync_interval * 1000; if (!intvl) { schedule_timeout_interruptible(msecs_to_jiffies(1000)); continue; } WBINFO(); flush_current_buffer(cache); blkdev_issue_flush(cache->device->bdev, GFP_NOIO, NULL); schedule_timeout_interruptible(msecs_to_jiffies(intvl)); } } ---------- flush-daemon.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "flush-daemon.h" void flush_proc(struct work_struct *work) { unsigned long flags; struct wb_cache *cache = container_of(work, struct wb_cache, flush_work); while (true) { struct flush_job *job; struct segment_header *seg; struct dm_io_request io_req; struct dm_io_region region; WBINFO(); spin_lock_irqsave(&cache->flush_queue_lock, flags); while (list_empty(&cache->flush_queue)) { spin_unlock_irqrestore(&cache->flush_queue_lock, flags); wait_event_interruptible_timeout( cache->flush_wait_queue, (!list_empty(&cache->flush_queue)), msecs_to_jiffies(100)); spin_lock_irqsave(&cache->flush_queue_lock, flags); if (cache->on_terminate) return; } /* * Pop a fluch_context from a list * and flush it. */ job = list_first_entry( &cache->flush_queue, struct flush_job, flush_queue); list_del(&job->flush_queue); spin_unlock_irqrestore(&cache->flush_queue_lock, flags); seg = job->seg; io_req = (struct dm_io_request) { .client = wb_io_client, .bi_rw = WRITE, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = job->rambuf->data, }; region = (struct dm_io_region) { .bdev = cache->device->bdev, .sector = seg->start_sector, .count = (seg->length + 1) << 3, }; dm_safe_io_retry(&io_req, 1, ®ion, false); cache->last_flushed_segment_id = seg->global_id; complete_all(&seg->flush_done); complete_all(&job->rambuf->done); /* * Deferred ACK */ if (!bio_list_empty(&job->barrier_ios)) { struct bio *bio; blkdev_issue_flush(cache->device->bdev, GFP_NOIO, NULL); while ((bio = bio_list_pop(&job->barrier_ios))) bio_endio(bio, 0); mod_timer(&cache->barrier_deadline_timer, msecs_to_jiffies(cache->barrier_deadline_ms)); } kfree(job); } } ---------- format-cache.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "format-cache.h" static int read_superblock_header(struct superblock_header_device *sup, struct dm_dev *dev) { int r = 0; struct dm_io_request io_req_sup; struct dm_io_region region_sup; void *buf = kmalloc(1 << SECTOR_SHIFT, GFP_KERNEL); if (!buf) { WBERR(); return -ENOMEM; } io_req_sup = (struct dm_io_request) { .client = wb_io_client, .bi_rw = READ, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; region_sup = (struct dm_io_region) { .bdev = dev->bdev, .sector = 0, .count = 1, }; r = dm_safe_io(&io_req_sup, 1, ®ion_sup, NULL, false); kfree(buf); if (r) { WBERR(); return r; } memcpy(sup, buf, sizeof(*sup)); return 0; } static int audit_superblock_header(struct superblock_header_device *sup) { u32 magic = le32_to_cpu(sup->magic); if (magic != WRITEBOOST_MAGIC) { WBERR(); return -EINVAL; } return 0; } /* * Check if the cache device is already formatted. * Returns 0 iff this routine runs without failure. * cache_valid is stored true iff the cache device * is formatted and needs not to be re-fomatted. */ int __must_check audit_cache_device(struct dm_dev *dev, bool *cache_valid) { int r = 0; struct superblock_header_device sup; r = read_superblock_header(&sup, dev); if (r) return r; *cache_valid = audit_superblock_header(&sup) ? false : true; return r; } static int format_superblock_header(struct dm_dev *dev) { int r = 0; struct dm_io_request io_req_sup; struct dm_io_region region_sup; struct superblock_header_device sup = { .magic = cpu_to_le32(WRITEBOOST_MAGIC), }; void *buf = kzalloc(1 << SECTOR_SHIFT, GFP_KERNEL); if (!buf) { WBERR(); return -ENOMEM; } memcpy(buf, &sup, sizeof(sup)); io_req_sup = (struct dm_io_request) { .client = wb_io_client, .bi_rw = WRITE_FUA, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; region_sup = (struct dm_io_region) { .bdev = dev->bdev, .sector = 0, .count = 1, }; r = dm_safe_io(&io_req_sup, 1, ®ion_sup, NULL, false); kfree(buf); if (r) { WBERR(); return r; } return 0; } struct format_segmd_context { int err; atomic64_t count; }; static void format_segmd_endio(unsigned long error, void *__context) { struct format_segmd_context *context = __context; if (error) context->err = 1; atomic64_dec(&context->count); } /* * Format superblock header and * all the metadata regions over the cache device. */ int __must_check format_cache_device(struct dm_dev *dev) { u64 i, nr_segments = calc_nr_segments(dev); struct format_segmd_context context; struct dm_io_request io_req_sup; struct dm_io_region region_sup; void *buf; int r = 0; /* * Zeroing the full superblock */ buf = kzalloc(1 << 20, GFP_KERNEL); if (!buf) { WBERR(); return -ENOMEM; } io_req_sup = (struct dm_io_request) { .client = wb_io_client, .bi_rw = WRITE_FUA, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; region_sup = (struct dm_io_region) { .bdev = dev->bdev, .sector = 0, .count = (1 << 11), }; r = dm_safe_io(&io_req_sup, 1, ®ion_sup, NULL, false); kfree(buf); if (r) { WBERR(); return r; } format_superblock_header(dev); /* Format the metadata regions */ /* * Count the number of segments */ atomic64_set(&context.count, nr_segments); context.err = 0; buf = kzalloc(1 << 12, GFP_KERNEL); if (!buf) { WBERR(); return -ENOMEM; } /* * Submit all the writes asynchronously. */ for (i = 0; i < nr_segments; i++) { struct dm_io_request io_req_seg = { .client = wb_io_client, .bi_rw = WRITE, .notify.fn = format_segmd_endio, .notify.context = &context, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; struct dm_io_region region_seg = { .bdev = dev->bdev, .sector = calc_segment_header_start(i), .count = (1 << 3), }; r = dm_safe_io(&io_req_seg, 1, ®ion_seg, NULL, false); if (r) { WBERR(); break; } } kfree(buf); if (r) { WBERR(); return r; } /* * Wait for all the writes complete. */ while (atomic64_read(&context.count)) schedule_timeout_interruptible(msecs_to_jiffies(100)); if (context.err) { WBERR(); return -EIO; } return blkdev_issue_flush(dev->bdev, GFP_KERNEL, NULL); } ---------- handle-io.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "handle-io.h" void inc_nr_dirty_caches(struct wb_device *wb) { BUG_ON(!wb); atomic64_inc(&wb->nr_dirty_caches); } static void dec_nr_dirty_caches(struct wb_device *wb) { BUG_ON(!wb); atomic64_dec(&wb->nr_dirty_caches); } void cleanup_mb_if_dirty(struct wb_cache *cache, struct segment_header *seg, struct metablock *mb) { unsigned long flags; bool b = false; lockseg(seg, flags); if (mb->dirty_bits) { mb->dirty_bits = 0; b = true; } unlockseg(seg, flags); if (b) dec_nr_dirty_caches(cache->wb); } u8 atomic_read_mb_dirtiness(struct segment_header *seg, struct metablock *mb) { unsigned long flags; u8 r; lockseg(seg, flags); r = mb->dirty_bits; unlockseg(seg, flags); return r; } static void inc_stat(struct wb_cache *cache, int rw, bool found, bool on_buffer, bool fullsize) { atomic64_t *v; int i = 0; if (rw) i |= (1 << STAT_WRITE); if (found) i |= (1 << STAT_HIT); if (on_buffer) i |= (1 << STAT_ON_BUFFER); if (fullsize) i |= (1 << STAT_FULLSIZE); v = &cache->stat[i]; atomic64_inc(v); } void clear_stat(struct wb_cache *cache) { int i; for (i = 0; i < STATLEN; i++) { atomic64_t *v = &cache->stat[i]; atomic64_set(v, 0); } } /* * Migrate a data on the cache device */ static void migrate_mb(struct wb_cache *cache, struct segment_header *seg, struct metablock *mb, u8 dirty_bits, bool thread) { struct wb_device *wb = cache->wb; if (!dirty_bits) return; if (dirty_bits == 255) { void *buf = kmalloc_retry(1 << 12, GFP_NOIO); struct dm_io_request io_req_r, io_req_w; struct dm_io_region region_r, region_w; io_req_r = (struct dm_io_request) { .client = wb_io_client, .bi_rw = READ, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; region_r = (struct dm_io_region) { .bdev = cache->device->bdev, .sector = calc_mb_start_sector(seg, mb->idx), .count = (1 << 3), }; dm_safe_io_retry(&io_req_r, 1, ®ion_r, thread); io_req_w = (struct dm_io_request) { .client = wb_io_client, .bi_rw = WRITE_FUA, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; region_w = (struct dm_io_region) { .bdev = wb->device->bdev, .sector = mb->sector, .count = (1 << 3), }; dm_safe_io_retry(&io_req_w, 1, ®ion_w, thread); kfree(buf); } else { void *buf = kmalloc_retry(1 << SECTOR_SHIFT, GFP_NOIO); size_t i; for (i = 0; i < 8; i++) { bool bit_on = dirty_bits & (1 << i); struct dm_io_request io_req_r, io_req_w; struct dm_io_region region_r, region_w; sector_t src; if (!bit_on) continue; io_req_r = (struct dm_io_request) { .client = wb_io_client, .bi_rw = READ, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; /* A tmp variable just to avoid 80 cols rule */ src = calc_mb_start_sector(seg, mb->idx) + i; region_r = (struct dm_io_region) { .bdev = cache->device->bdev, .sector = src, .count = 1, }; dm_safe_io_retry(&io_req_r, 1, ®ion_r, thread); io_req_w = (struct dm_io_request) { .client = wb_io_client, .bi_rw = WRITE, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; region_w = (struct dm_io_region) { .bdev = wb->device->bdev, .sector = mb->sector + 1 * i, .count = 1, }; dm_safe_io_retry(&io_req_w, 1, ®ion_w, thread); } kfree(buf); } } /* * Migrate the cache on the RAM buffer. * Calling this function is really rare. */ static void migrate_buffered_mb(struct wb_cache *cache, struct metablock *mb, u8 dirty_bits) { struct wb_device *wb = cache->wb; u8 i, k = 1 + (mb->idx % NR_CACHES_INSEG); sector_t offset = (k << 3); void *buf = kmalloc_retry(1 << SECTOR_SHIFT, GFP_NOIO); for (i = 0; i < 8; i++) { struct dm_io_request io_req; struct dm_io_region region; void *src; sector_t dest; bool bit_on = dirty_bits & (1 << i); if (!bit_on) continue; src = cache->current_rambuf->data + ((offset + i) << SECTOR_SHIFT); memcpy(buf, src, 1 << SECTOR_SHIFT); io_req = (struct dm_io_request) { .client = wb_io_client, .bi_rw = WRITE_FUA, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; dest = mb->sector + 1 * i; region = (struct dm_io_region) { .bdev = wb->device->bdev, .sector = dest, .count = 1, }; dm_safe_io_retry(&io_req, 1, ®ion, true); } kfree(buf); } static void bio_remap(struct bio *bio, struct dm_dev *dev, sector_t sector) { bio->bi_bdev = dev->bdev; bio->bi_sector = sector; } static sector_t calc_cache_alignment(struct wb_cache *cache, sector_t bio_sector) { return (bio_sector / (1 << 3)) * (1 << 3); } int writeboost_map(struct dm_target *ti, struct bio *bio #if LINUX_VERSION_CODE < PER_BIO_VERSION , union map_info *map_context #endif ) { unsigned long flags; struct segment_header *uninitialized_var(seg); struct metablock *mb, *new_mb; #if LINUX_VERSION_CODE >= PER_BIO_VERSION struct per_bio_data *map_context; #endif sector_t bio_count, bio_offset, s; bool bio_fullsize, found, on_buffer, refresh_segment, b; int rw; struct lookup_key key; struct ht_head *head; cache_nr update_mb_idx, idx_inseg, k; size_t start; void *data; struct wb_device *wb = ti->private; struct wb_cache *cache = wb->cache; struct dm_dev *orig = wb->device; #if LINUX_VERSION_CODE >= PER_BIO_VERSION map_context = dm_per_bio_data(bio, ti->per_bio_data_size); #endif map_context->ptr = NULL; /* * We only discard only the backing store because * blocks on cache device are unlikely to be discarded. * * Discarding blocks is likely to be operated * long after writing; * the block is likely to be migrated before. * Moreover, * we discard the segment at the end of migration * and that's enough for discarding blocks. */ if (bio->bi_rw & REQ_DISCARD) { bio_remap(bio, orig, bio->bi_sector); return DM_MAPIO_REMAPPED; } /* * defered ACK for barrier writes * * bio with REQ_FLUSH is guaranteed * to have no data. * So, simply queue it and return. */ if (bio->bi_rw & REQ_FLUSH) { BUG_ON(bio->bi_size); queue_barrier_io(cache, bio); return DM_MAPIO_SUBMITTED; } bio_count = bio->bi_size >> SECTOR_SHIFT; bio_fullsize = (bio_count == (1 << 3)); bio_offset = bio->bi_sector % (1 << 3); rw = bio_data_dir(bio); key = (struct lookup_key) { .sector = calc_cache_alignment(cache, bio->bi_sector), }; k = ht_hash(cache, &key); head = bigarray_at(cache->htable, k); /* * (Locking) * Why mutex? * * The reason we use mutex instead of rw_semaphore * that can allow truely concurrent read access * is that mutex is even lighter than rw_semaphore. * Since dm-writebuffer is a real performance centric software * the overhead of rw_semaphore is crucial. * All in all, * since exclusive region in read path is enough small * and cheap, using rw_semaphore and let the reads * execute concurrently won't improve the performance * as much as one expects. */ mutex_lock(&cache->io_lock); mb = ht_lookup(cache, head, &key); if (mb) { seg = ((void *) mb) - (mb->idx % NR_CACHES_INSEG) * sizeof(struct metablock); atomic_inc(&seg->nr_inflight_ios); } found = (mb != NULL); on_buffer = false; if (found) on_buffer = is_on_buffer(cache, mb->idx); inc_stat(cache, rw, found, on_buffer, bio_fullsize); if (!rw) { u8 dirty_bits; mutex_unlock(&cache->io_lock); if (!found) { bio_remap(bio, orig, bio->bi_sector); return DM_MAPIO_REMAPPED; } dirty_bits = atomic_read_mb_dirtiness(seg, mb); if (unlikely(on_buffer)) { if (dirty_bits) migrate_buffered_mb(cache, mb, dirty_bits); /* * Cache class * Live and Stable * * Live: * The cache is on the RAM buffer. * * Stable: * The cache is not on the RAM buffer * but at least queued in flush_queue. */ /* * (Locking) * Dirtiness of a live cache * * We can assume dirtiness of a cache only increase * when it is on the buffer, we call this cache is live. * This eases the locking because * we don't worry the dirtiness of * a live cache fluctuates. */ atomic_dec(&seg->nr_inflight_ios); bio_remap(bio, orig, bio->bi_sector); return DM_MAPIO_REMAPPED; } wait_for_completion(&seg->flush_done); if (likely(dirty_bits == 255)) { bio_remap(bio, cache->device, calc_mb_start_sector(seg, mb->idx) + bio_offset); map_context->ptr = seg; } else { /* * (Locking) * Dirtiness of a stable cache * * Unlike the live caches that don't * fluctuate the dirtiness, * stable caches which are not on the buffer * but on the cache device * may decrease the dirtiness by other processes * than the migrate daemon. * This works fine * because migrating the same cache twice * doesn't craze the cache concistency. */ migrate_mb(cache, seg, mb, dirty_bits, true); cleanup_mb_if_dirty(cache, seg, mb); atomic_dec(&seg->nr_inflight_ios); bio_remap(bio, orig, bio->bi_sector); } return DM_MAPIO_REMAPPED; } if (found) { if (unlikely(on_buffer)) { mutex_unlock(&cache->io_lock); update_mb_idx = mb->idx; goto write_on_buffer; } else { u8 dirty_bits = atomic_read_mb_dirtiness(seg, mb); /* * First clean up the previous cache * and migrate the cache if needed. */ bool needs_cleanup_prev_cache = !bio_fullsize || !(dirty_bits == 255); if (unlikely(needs_cleanup_prev_cache)) { wait_for_completion(&seg->flush_done); migrate_mb(cache, seg, mb, dirty_bits, true); } /* * Fullsize dirty cache * can be discarded without migration. */ cleanup_mb_if_dirty(cache, seg, mb); ht_del(cache, mb); atomic_dec(&seg->nr_inflight_ios); goto write_not_found; } } write_not_found: ; /* * If cache->cursor is 254, 509, ... * that is the last cache line in the segment. * We must flush the current segment and * get the new one. */ refresh_segment = !((cache->cursor + 1) % NR_CACHES_INSEG); if (refresh_segment) queue_current_buffer(cache); cache->cursor = (cache->cursor + 1) % cache->nr_caches; /* * update_mb_idx is the cache line index to update. */ update_mb_idx = cache->cursor; seg = cache->current_seg; atomic_inc(&seg->nr_inflight_ios); new_mb = seg->mb_array + (update_mb_idx % NR_CACHES_INSEG); new_mb->dirty_bits = 0; ht_register(cache, head, &key, new_mb); mutex_unlock(&cache->io_lock); mb = new_mb; write_on_buffer: ; idx_inseg = update_mb_idx % NR_CACHES_INSEG; s = (idx_inseg + 1) << 3; b = false; lockseg(seg, flags); if (!mb->dirty_bits) { seg->length++; BUG_ON(seg->length > NR_CACHES_INSEG); b = true; } if (likely(bio_fullsize)) { mb->dirty_bits = 255; } else { u8 i; u8 acc_bits = 0; s += bio_offset; for (i = bio_offset; i < (bio_offset+bio_count); i++) acc_bits += (1 << i); mb->dirty_bits |= acc_bits; } BUG_ON(!mb->dirty_bits); unlockseg(seg, flags); if (b) inc_nr_dirty_caches(wb); start = s << SECTOR_SHIFT; data = bio_data(bio); memcpy(cache->current_rambuf->data + start, data, bio->bi_size); atomic_dec(&seg->nr_inflight_ios); /* * deferred ACK for barrier writes * * bio with REQ_FUA flag has data. * So, we run through the path for the * ordinary bio. And the data is * now stored in the RAM buffer. * After that, queue it and return * to defer completion. */ if (bio->bi_rw & REQ_FUA) { queue_barrier_io(cache, bio); return DM_MAPIO_SUBMITTED; } bio_endio(bio, 0); return DM_MAPIO_SUBMITTED; } int writeboost_end_io(struct dm_target *ti, struct bio *bio, int error #if LINUX_VERSION_CODE < PER_BIO_VERSION , union map_info *map_context #endif ) { struct segment_header *seg; #if LINUX_VERSION_CODE >= PER_BIO_VERSION struct per_bio_data *map_context = dm_per_bio_data(bio, ti->per_bio_data_size); #endif if (!map_context->ptr) return 0; seg = map_context->ptr; atomic_dec(&seg->nr_inflight_ios); return 0; } ---------- hashtable.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "hashtable.h" /* * Initialize the Hash Table. */ int __must_check ht_empty_init(struct wb_cache *cache) { cache_nr idx; size_t i; size_t nr_heads; struct bigarray *arr; cache->htsize = cache->nr_caches; nr_heads = cache->htsize + 1; arr = make_bigarray(sizeof(struct ht_head), nr_heads); if (!arr) { WBERR(); return -ENOMEM; } cache->htable = arr; for (i = 0; i < nr_heads; i++) { struct ht_head *hd = bigarray_at(arr, i); INIT_HLIST_HEAD(&hd->ht_list); } /* * Our hashtable has one special bucket called null head. * Orphan metablocks are linked to the null head. */ cache->null_head = bigarray_at(cache->htable, cache->htsize); for (idx = 0; idx < cache->nr_caches; idx++) { struct metablock *mb = mb_at(cache, idx); hlist_add_head(&mb->ht_list, &cache->null_head->ht_list); } return 0; } cache_nr ht_hash(struct wb_cache *cache, struct lookup_key *key) { return key->sector % cache->htsize; } static bool mb_hit(struct metablock *mb, struct lookup_key *key) { return mb->sector == key->sector; } void ht_del(struct wb_cache *cache, struct metablock *mb) { struct ht_head *null_head; hlist_del(&mb->ht_list); null_head = cache->null_head; hlist_add_head(&mb->ht_list, &null_head->ht_list); } void ht_register(struct wb_cache *cache, struct ht_head *head, struct lookup_key *key, struct metablock *mb) { hlist_del(&mb->ht_list); hlist_add_head(&mb->ht_list, &head->ht_list); mb->sector = key->sector; }; struct metablock *ht_lookup(struct wb_cache *cache, struct ht_head *head, struct lookup_key *key) { struct metablock *mb, *found = NULL; #if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 9, 0) hlist_for_each_entry(mb, &head->ht_list, ht_list) #else struct hlist_node *pos; hlist_for_each_entry(mb, pos, &head->ht_list, ht_list) #endif { if (mb_hit(mb, key)) { found = mb; break; } } return found; } /* * Discard all the metablock in a segment. */ void discard_caches_inseg(struct wb_cache *cache, struct segment_header *seg) { u8 i; for (i = 0; i < NR_CACHES_INSEG; i++) { struct metablock *mb = seg->mb_array + i; ht_del(cache, mb); } } ---------- migrate-daemon.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "migrate-daemon.h" static void migrate_endio(unsigned long error, void *context) { struct wb_cache *cache = context; if (error) atomic_inc(&cache->migrate_fail_count); if (atomic_dec_and_test(&cache->migrate_io_count)) wake_up_interruptible(&cache->migrate_wait_queue); } /* * Submit the segment data at position k * in migrate buffer. * Batched migration first gather all the segments * to migrate into a migrate buffer. * So, there are a number of segment data * in the buffer. * This function submits the one in position k. */ static void submit_migrate_io(struct wb_cache *cache, struct segment_header *seg, size_t k) { u8 i, j; size_t a = NR_CACHES_INSEG * k; void *p = cache->migrate_buffer + (NR_CACHES_INSEG << 12) * k; for (i = 0; i < seg->length; i++) { struct metablock *mb = seg->mb_array + i; struct wb_device *wb = cache->wb; u8 dirty_bits = *(cache->dirtiness_snapshot + (a + i)); unsigned long offset; void *base, *addr; struct dm_io_request io_req_w; struct dm_io_region region_w; if (!dirty_bits) continue; offset = i << 12; base = p + offset; if (dirty_bits == 255) { addr = base; io_req_w = (struct dm_io_request) { .client = wb_io_client, .bi_rw = WRITE, .notify.fn = migrate_endio, .notify.context = cache, .mem.type = DM_IO_VMA, .mem.ptr.vma = addr, }; region_w = (struct dm_io_region) { .bdev = wb->device->bdev, .sector = mb->sector, .count = (1 << 3), }; dm_safe_io_retry(&io_req_w, 1, ®ion_w, false); } else { for (j = 0; j < 8; j++) { bool b = dirty_bits & (1 << j); if (!b) continue; addr = base + (j << SECTOR_SHIFT); io_req_w = (struct dm_io_request) { .client = wb_io_client, .bi_rw = WRITE, .notify.fn = migrate_endio, .notify.context = cache, .mem.type = DM_IO_VMA, .mem.ptr.vma = addr, }; region_w = (struct dm_io_region) { .bdev = wb->device->bdev, .sector = mb->sector + j, .count = 1, }; dm_safe_io_retry( &io_req_w, 1, ®ion_w, false); } } } } static void memorize_dirty_state(struct wb_cache *cache, struct segment_header *seg, size_t k, size_t *migrate_io_count) { u8 i, j; size_t a = NR_CACHES_INSEG * k; void *p = cache->migrate_buffer + (NR_CACHES_INSEG << 12) * k; struct metablock *mb; struct dm_io_request io_req_r = { .client = wb_io_client, .bi_rw = READ, .notify.fn = NULL, .mem.type = DM_IO_VMA, .mem.ptr.vma = p, }; struct dm_io_region region_r = { .bdev = cache->device->bdev, .sector = seg->start_sector + (1 << 3), .count = seg->length << 3, }; dm_safe_io_retry(&io_req_r, 1, ®ion_r, false); /* * We take snapshot of the dirtiness in the segments. * The snapshot segments * are dirtier than themselves of any future moment * and we will migrate the possible dirtiest * state of the segments * which won't lose any dirty data that was acknowledged. */ for (i = 0; i < seg->length; i++) { mb = seg->mb_array + i; *(cache->dirtiness_snapshot + (a + i)) = atomic_read_mb_dirtiness(seg, mb); } for (i = 0; i < seg->length; i++) { u8 dirty_bits; mb = seg->mb_array + i; dirty_bits = *(cache->dirtiness_snapshot + (a + i)); if (!dirty_bits) continue; if (dirty_bits == 255) { (*migrate_io_count)++; } else { for (j = 0; j < 8; j++) { if (dirty_bits & (1 << j)) (*migrate_io_count)++; } } } } static void cleanup_segment(struct wb_cache *cache, struct segment_header *seg) { u8 i; for (i = 0; i < seg->length; i++) { struct metablock *mb = seg->mb_array + i; cleanup_mb_if_dirty(cache, seg, mb); } } static void migrate_linked_segments(struct wb_cache *cache) { struct segment_header *seg; size_t k, migrate_io_count = 0; /* * Memorize the dirty state to migrate before going in. * - How many migration writes should be submitted atomically, * - Which cache lines are dirty to migarate * - etc. */ k = 0; list_for_each_entry(seg, &cache->migrate_list, migrate_list) { memorize_dirty_state(cache, seg, k, &migrate_io_count); k++; } migrate_write: atomic_set(&cache->migrate_io_count, migrate_io_count); atomic_set(&cache->migrate_fail_count, 0); k = 0; list_for_each_entry(seg, &cache->migrate_list, migrate_list) { submit_migrate_io(cache, seg, k); k++; } wait_event_interruptible(cache->migrate_wait_queue, atomic_read(&cache->migrate_io_count) == 0); if (atomic_read(&cache->migrate_fail_count)) { WBWARN("%u writebacks failed. retry.", atomic_read(&cache->migrate_fail_count)); goto migrate_write; } BUG_ON(atomic_read(&cache->migrate_io_count)); list_for_each_entry(seg, &cache->migrate_list, migrate_list) { cleanup_segment(cache, seg); } /* * The segment may have a block * that returns ACK for persistent write * on the cache device. * Migrating them in non-persistent way * is betrayal to the client * who received the ACK and * expects the data is persistent. * Since it is difficult to know * whether a cache in a segment * is of that status * we are on the safe side * on this issue by always * migrating those data persistently. */ blkdev_issue_flush(cache->wb->device->bdev, GFP_NOIO, NULL); /* * Discarding the migrated regions * can avoid unnecessary wear amplifier in the future. * * But note that we should not discard * the metablock region because * whether or not to ensure * the discarded block returns certain value * is depends on venders * and unexpected metablock data * will craze the cache. */ list_for_each_entry(seg, &cache->migrate_list, migrate_list) { blkdev_issue_discard(cache->device->bdev, seg->start_sector + (1 << 3), seg->length << 3, GFP_NOIO, 0); } } void migrate_proc(struct work_struct *work) { struct wb_cache *cache = container_of(work, struct wb_cache, migrate_work); while (true) { bool allow_migrate; size_t i, nr_mig_candidates, nr_mig; struct segment_header *seg, *tmp; WBINFO(); if (cache->on_terminate) return; /* * If reserving_id > 0 * Migration should be immediate. */ allow_migrate = cache->reserving_segment_id || cache->allow_migrate; if (!allow_migrate) { schedule_timeout_interruptible(msecs_to_jiffies(1000)); continue; } nr_mig_candidates = cache->last_flushed_segment_id - cache->last_migrated_segment_id; if (!nr_mig_candidates) { schedule_timeout_interruptible(msecs_to_jiffies(1000)); continue; } if (cache->nr_cur_batched_migration != cache->nr_max_batched_migration){ vfree(cache->migrate_buffer); kfree(cache->dirtiness_snapshot); cache->nr_cur_batched_migration = cache->nr_max_batched_migration; cache->migrate_buffer = vmalloc(cache->nr_cur_batched_migration * (NR_CACHES_INSEG << 12)); cache->dirtiness_snapshot = kmalloc_retry(cache->nr_cur_batched_migration * NR_CACHES_INSEG, GFP_NOIO); BUG_ON(!cache->migrate_buffer); BUG_ON(!cache->dirtiness_snapshot); } /* * Batched Migration: * We will migrate at most nr_max_batched_migration * segments at a time. */ nr_mig = min(nr_mig_candidates, cache->nr_cur_batched_migration); /* * Add segments to migrate atomically. */ for (i = 1; i <= nr_mig; i++) { seg = get_segment_header_by_id( cache, cache->last_migrated_segment_id + i); list_add_tail(&seg->migrate_list, &cache->migrate_list); } migrate_linked_segments(cache); /* * (Locking) * Only line of code changes * last_migrate_segment_id during runtime. */ cache->last_migrated_segment_id += nr_mig; list_for_each_entry_safe(seg, tmp, &cache->migrate_list, migrate_list) { complete_all(&seg->migrate_done); list_del(&seg->migrate_list); } } } void wait_for_migration(struct wb_cache *cache, size_t id) { struct segment_header *seg = get_segment_header_by_id(cache, id); /* * Set reserving_segment_id to non zero * to force the migartion daemon * to complete migarate of this segment * immediately. */ cache->reserving_segment_id = id; wait_for_completion(&seg->migrate_done); cache->reserving_segment_id = 0; } ---------- migrate-modulator.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "migrate-modulator.h" void modulator_proc(struct work_struct *work) { struct wb_cache *cache = container_of(work, struct wb_cache, modulator_work); struct wb_device *wb = cache->wb; struct hd_struct *hd = wb->device->bdev->bd_part; unsigned long old = 0, new, util; unsigned long intvl = 1000; while (true) { if (cache->on_terminate) return; new = jiffies_to_msecs(part_stat_read(hd, io_ticks)); if (!cache->enable_migration_modulator) goto modulator_update; util = (100 * (new - old)) / 1000; WBINFO("%u", (unsigned) util); if (util < wb->migrate_threshold) cache->allow_migrate = true; else cache->allow_migrate = false; modulator_update: old = new; schedule_timeout_interruptible(msecs_to_jiffies(intvl)); } } ---------- queue-flush-job.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "queue-flush-job.h" static u8 count_dirty_caches_remained(struct segment_header *seg) { u8 i, count = 0; struct metablock *mb; for (i = 0; i < seg->length; i++) { mb = seg->mb_array + i; if (mb->dirty_bits) count++; } return count; } /* * Make a metadata in segment data to flush. * @dest The metadata part of the segment to flush */ static void prepare_segment_header_device(struct segment_header_device *dest, struct wb_cache *cache, struct segment_header *src) { cache_nr i; u8 left, right; dest->global_id = cpu_to_le64(src->global_id); dest->length = src->length; dest->lap = cpu_to_le32(calc_segment_lap(cache, src->global_id)); left = src->length - 1; right = (cache->cursor) % NR_CACHES_INSEG; BUG_ON(left != right); for (i = 0; i < src->length; i++) { struct metablock *mb = src->mb_array + i; struct metablock_device *mbdev = &dest->mbarr[i]; mbdev->sector = cpu_to_le64(mb->sector); mbdev->dirty_bits = mb->dirty_bits; mbdev->lap = cpu_to_le32(dest->lap); } } static void prepare_meta_rambuffer(void *rambuffer, struct wb_cache *cache, struct segment_header *seg) { prepare_segment_header_device(rambuffer, cache, seg); } /* * Queue the current segment into the queue * and prepare a new segment. */ static void queue_flushing(struct wb_cache *cache) { unsigned long flags; struct segment_header *current_seg = cache->current_seg, *new_seg; struct flush_job *job; bool empty; struct rambuffer *next_rambuf; size_t n1 = 0, n2 = 0; u64 next_id; while (atomic_read(¤t_seg->nr_inflight_ios)) { n1++; if (n1 == 100) WBWARN(); schedule_timeout_interruptible(msecs_to_jiffies(1)); } prepare_meta_rambuffer(cache->current_rambuf->data, cache, cache->current_seg); INIT_COMPLETION(current_seg->migrate_done); INIT_COMPLETION(current_seg->flush_done); job = kmalloc_retry(sizeof(*job), GFP_NOIO); INIT_LIST_HEAD(&job->flush_queue); job->seg = current_seg; job->rambuf = cache->current_rambuf; bio_list_init(&job->barrier_ios); bio_list_merge(&job->barrier_ios, &cache->barrier_ios); bio_list_init(&cache->barrier_ios); spin_lock_irqsave(&cache->flush_queue_lock, flags); empty = list_empty(&cache->flush_queue); list_add_tail(&job->flush_queue, &cache->flush_queue); spin_unlock_irqrestore(&cache->flush_queue_lock, flags); if (empty) wake_up_interruptible(&cache->flush_wait_queue); next_id = current_seg->global_id + 1; new_seg = get_segment_header_by_id(cache, next_id); new_seg->global_id = next_id; while (atomic_read(&new_seg->nr_inflight_ios)) { n2++; if (n2 == 100) WBWARN(); schedule_timeout_interruptible(msecs_to_jiffies(1)); } BUG_ON(count_dirty_caches_remained(new_seg)); discard_caches_inseg(cache, new_seg); /* * Set the cursor to the last of the flushed segment. */ cache->cursor = current_seg->start_idx + (NR_CACHES_INSEG - 1); new_seg->length = 0; next_rambuf = cache->rambuf_pool + (next_id % NR_RAMBUF_POOL); wait_for_completion(&next_rambuf->done); INIT_COMPLETION(next_rambuf->done); cache->current_rambuf = next_rambuf; cache->current_seg = new_seg; } void queue_current_buffer(struct wb_cache *cache) { /* * Before we get the next segment * we must wait until the segment is all clean. * A clean segment doesn't have * log to flush and dirties to migrate. */ u64 next_id = cache->current_seg->global_id + 1; struct segment_header *next_seg = get_segment_header_by_id(cache, next_id); wait_for_completion(&next_seg->flush_done); wait_for_migration(cache, next_id); queue_flushing(cache); } /* * flush all the dirty data at a moment * but NOT persistently. * Clean up the writes before termination * is an example of the usecase. */ void flush_current_buffer(struct wb_cache *cache) { struct segment_header *old_seg; mutex_lock(&cache->io_lock); old_seg = cache->current_seg; queue_current_buffer(cache); cache->cursor = (cache->cursor + 1) % cache->nr_caches; cache->current_seg->length = 1; mutex_unlock(&cache->io_lock); wait_for_completion(&old_seg->flush_done); } ---------- rambuf.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "rambuf.h" int __must_check init_rambuf_pool(struct wb_cache *cache) { size_t i, j; struct rambuffer *rambuf; cache->rambuf_pool = kmalloc(sizeof(struct rambuffer) * NR_RAMBUF_POOL, GFP_KERNEL); if (!cache->rambuf_pool) { WBERR(); return -ENOMEM; } for (i = 0; i < NR_RAMBUF_POOL; i++) { rambuf = cache->rambuf_pool + i; init_completion(&rambuf->done); complete_all(&rambuf->done); rambuf->data = kmalloc( 1 << (WB_SEGMENTSIZE_ORDER + SECTOR_SHIFT), GFP_KERNEL); if (!rambuf->data) { WBERR(); for (j = 0; j < i; j++) { rambuf = cache->rambuf_pool + j; kfree(rambuf->data); } kfree(cache->rambuf_pool); return -ENOMEM; } } return 0; } void free_rambuf_pool(struct wb_cache *cache) { struct rambuffer *rambuf; size_t i; for (i = 0; i < NR_RAMBUF_POOL; i++) { rambuf = cache->rambuf_pool + i; kfree(rambuf->data); } kfree(cache->rambuf_pool); } ---------- recover.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "recover.h" static int __must_check read_superblock_record(struct superblock_record_device *record, struct wb_cache *cache) { int r = 0; struct dm_io_request io_req; struct dm_io_region region; void *buf = kmalloc(1 << SECTOR_SHIFT, GFP_KERNEL); if (!buf) { WBERR(); return -ENOMEM; } io_req = (struct dm_io_request) { .client = wb_io_client, .bi_rw = READ, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; region = (struct dm_io_region) { .bdev = cache->device->bdev, .sector = (1 << 11) - 1, .count = 1, }; r = dm_safe_io(&io_req, 1, ®ion, NULL, true); kfree(buf); if (r) { WBERR(); return r; } memcpy(record, buf, sizeof(*record)); return r; } static int __must_check read_segment_header_device(struct segment_header_device *dest, struct wb_cache *cache, size_t segment_idx) { int r = 0; struct dm_io_request io_req; struct dm_io_region region; void *buf = kmalloc(1 << 12, GFP_KERNEL); if (!buf) { WBERR(); return -ENOMEM; } io_req = (struct dm_io_request) { .client = wb_io_client, .bi_rw = READ, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; region = (struct dm_io_region) { .bdev = cache->device->bdev, .sector = calc_segment_header_start(segment_idx), .count = (1 << 3), }; r = dm_safe_io(&io_req, 1, ®ion, NULL, false); kfree(buf); if (r) { WBERR(); return r; } memcpy(dest, buf, sizeof(*dest)); return r; } /* * Read the on-disk metadata of the segment * and update the in-core cache metadata structure * like Hash Table. */ static void update_by_segment_header_device(struct wb_cache *cache, struct segment_header_device *src) { cache_nr i; struct segment_header *seg = get_segment_header_by_id(cache, src->global_id); seg->length = src->length; INIT_COMPLETION(seg->migrate_done); for (i = 0 ; i < src->length; i++) { cache_nr k; struct lookup_key key; struct ht_head *head; struct metablock *found, *mb = seg->mb_array + i; struct metablock_device *mbdev = &src->mbarr[i]; if (!mbdev->dirty_bits) continue; mb->sector = le64_to_cpu(mbdev->sector); mb->dirty_bits = mbdev->dirty_bits; inc_nr_dirty_caches(cache->wb); key = (struct lookup_key) { .sector = mb->sector, }; k = ht_hash(cache, &key); head = bigarray_at(cache->htable, k); found = ht_lookup(cache, head, &key); if (found) ht_del(cache, found); ht_register(cache, head, &key, mb); } } /* * If only if the lap attributes * are the same between header and all the metablock, * the segment is judged to be flushed correctly * and then merge into the runtime structure. * Otherwise, ignored. */ static bool checkup_atomicity(struct segment_header_device *header) { u8 i; u32 a = le32_to_cpu(header->lap), b; for (i = 0; i < header->length; i++) { struct metablock_device *o; o = header->mbarr + i; b = le32_to_cpu(o->lap); if (a != b) return false; } return true; } int __must_check recover_cache(struct wb_cache *cache) { int r = 0; struct segment_header_device *header; struct segment_header *seg; u64 i, j, max_id, oldest_id, last_flushed_id, init_segment_id, oldest_idx, nr_segments = cache->nr_segments, header_id, record_id; struct superblock_record_device uninitialized_var(record); r = read_superblock_record(&record, cache); if (r) { WBERR(); return r; } WBINFO("%llu", record.last_migrated_segment_id); record_id = le64_to_cpu(record.last_migrated_segment_id); WBINFO("%llu", record_id); header = kmalloc(sizeof(*header), GFP_KERNEL); if (!header) { WBERR(); return -ENOMEM; } /* * Finding the oldest, non-zero id and its index. */ max_id = SZ_MAX; oldest_id = max_id; oldest_idx = 0; for (i = 0; i < nr_segments; i++) { r = read_segment_header_device(header, cache, i); if (r) { WBERR(); kfree(header); return r; } header_id = le64_to_cpu(header->global_id); if (header_id < 1) continue; if (header_id < oldest_id) { oldest_idx = i; oldest_id = header_id; } } last_flushed_id = 0; /* * This is an invariant. * We always start from the segment * that is right after the last_flush_id. */ init_segment_id = last_flushed_id + 1; /* * If no segment was flushed * then there is nothing to recover. */ if (oldest_id == max_id) goto setup_init_segment; /* * What we have to do in the next loop is to * revive the segments that are * flushed but yet not migrated. */ /* * Example: * There are only 5 segments. * The segments we will consider are of id k+2 and k+3 * because they are dirty but not migrated. * * id: [ k+3 ][ k+4 ][ k ][ k+1 ][ K+2 ] * last_flushed init_seg migrated last_migrated flushed */ for (i = oldest_idx; i < (nr_segments + oldest_idx); i++) { j = i % nr_segments; r = read_segment_header_device(header, cache, j); if (r) { WBERR(); kfree(header); return r; } header_id = le64_to_cpu(header->global_id); /* * Valid global_id > 0. * We encounter header with global_id = 0 and * we can consider * this and the followings are all invalid. */ if (header_id <= last_flushed_id) break; if (!checkup_atomicity(header)) { WBWARN("header atomicity broken id %llu", header_id); break; } /* * Now the header is proven valid. */ last_flushed_id = header_id; init_segment_id = last_flushed_id + 1; /* * If the data is already on the backing store, * we ignore the segment. */ if (header_id <= record_id) continue; update_by_segment_header_device(cache, header); } setup_init_segment: kfree(header); seg = get_segment_header_by_id(cache, init_segment_id); seg->global_id = init_segment_id; atomic_set(&seg->nr_inflight_ios, 0); cache->last_flushed_segment_id = seg->global_id - 1; cache->last_migrated_segment_id = cache->last_flushed_segment_id > cache->nr_segments ? cache->last_flushed_segment_id - cache->nr_segments : 0; if (record_id > cache->last_migrated_segment_id) cache->last_migrated_segment_id = record_id; WBINFO("%llu", cache->last_migrated_segment_id); wait_for_migration(cache, seg->global_id); discard_caches_inseg(cache, seg); /* * cursor is set to the first element of the segment. * This means that we will not use the element. */ cache->cursor = seg->start_idx; seg->length = 1; cache->current_seg = seg; return 0; } ---------- segment.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "segment.h" /* * Get the in-core metablock of the given index. */ struct metablock *mb_at(struct wb_cache *cache, cache_nr idx) { u64 seg_idx = idx / NR_CACHES_INSEG; struct segment_header *seg = bigarray_at(cache->segment_header_array, seg_idx); cache_nr idx_inseg = idx % NR_CACHES_INSEG; return seg->mb_array + idx_inseg; } static void mb_array_empty_init(struct wb_cache *cache) { size_t i; for (i = 0; i < cache->nr_caches; i++) { struct metablock *mb = mb_at(cache, i); INIT_HLIST_NODE(&mb->ht_list); mb->idx = i; mb->dirty_bits = 0; } } int __must_check init_segment_header_array(struct wb_cache *cache) { u64 segment_idx, nr_segments = cache->nr_segments; cache->segment_header_array = make_bigarray(sizeof(struct segment_header), nr_segments); if (!cache->segment_header_array) { WBERR(); return -ENOMEM; } for (segment_idx = 0; segment_idx < nr_segments; segment_idx++) { struct segment_header *seg = bigarray_at(cache->segment_header_array, segment_idx); seg->start_idx = NR_CACHES_INSEG * segment_idx; seg->start_sector = ((segment_idx % nr_segments) + 1) * (1 << WB_SEGMENTSIZE_ORDER); seg->length = 0; atomic_set(&seg->nr_inflight_ios, 0); spin_lock_init(&seg->lock); INIT_LIST_HEAD(&seg->migrate_list); init_completion(&seg->flush_done); complete_all(&seg->flush_done); init_completion(&seg->migrate_done); complete_all(&seg->migrate_done); } mb_array_empty_init(cache); return 0; } /* * Get the segment from the segment id. * The Index of the segment is calculated from the segment id. */ struct segment_header *get_segment_header_by_id(struct wb_cache *cache, size_t segment_id) { struct segment_header *r = bigarray_at(cache->segment_header_array, (segment_id - 1) % cache->nr_segments); return r; } u32 calc_segment_lap(struct wb_cache *cache, size_t segment_id) { u32 a = (segment_id - 1) / cache->nr_segments; return a + 1; }; sector_t calc_mb_start_sector(struct segment_header *seg, cache_nr mb_idx) { size_t k = 1 + (mb_idx % NR_CACHES_INSEG); return seg->start_sector + (k << 3); } sector_t calc_segment_header_start(size_t segment_idx) { return (1 << WB_SEGMENTSIZE_ORDER) * (segment_idx + 1); } u64 calc_nr_segments(struct dm_dev *dev) { sector_t devsize = dm_devsize(dev); return devsize / (1 << WB_SEGMENTSIZE_ORDER) - 1; } bool is_on_buffer(struct wb_cache *cache, cache_nr mb_idx) { cache_nr start = cache->current_seg->start_idx; if (mb_idx < start) return false; if (mb_idx >= (start + NR_CACHES_INSEG)) return false; return true; } ---------- superblock-recorder.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "superblock-recorder.h" static void update_superblock_record(struct wb_cache *cache) { struct superblock_record_device o; void *buf; struct dm_io_request io_req; struct dm_io_region region; o.last_migrated_segment_id = cpu_to_le64(cache->last_migrated_segment_id); buf = kmalloc_retry(1 << SECTOR_SHIFT, GFP_NOIO | __GFP_ZERO); memcpy(buf, &o, sizeof(o)); io_req = (struct dm_io_request) { .client = wb_io_client, .bi_rw = WRITE_FUA, .notify.fn = NULL, .mem.type = DM_IO_KMEM, .mem.ptr.addr = buf, }; region = (struct dm_io_region) { .bdev = cache->device->bdev, .sector = (1 << 11) - 1, .count = 1, }; dm_safe_io_retry(&io_req, 1, ®ion, true); kfree(buf); } void recorder_proc(struct work_struct *work) { struct wb_cache *cache = container_of(work, struct wb_cache, recorder_work); unsigned long intvl; while (true) { if (cache->on_terminate) return; /* sec -> ms */ intvl = cache->update_record_interval * 1000; if (!intvl) { schedule_timeout_interruptible(msecs_to_jiffies(1000)); continue; } WBINFO(); update_superblock_record(cache); schedule_timeout_interruptible(msecs_to_jiffies(intvl)); } } ---------- target.c ---------- /* * writeboost * Log-structured Caching for Linux * * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "target.h" /* * <backing dev> <cache dev> */ static int writeboost_ctr(struct dm_target *ti, unsigned int argc, char **argv) { int r = 0; bool cache_valid; struct wb_device *wb; struct wb_cache *cache; struct dm_dev *origdev, *cachedev; #if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 6, 0) r = dm_set_target_max_io_len(ti, (1 << 3)); if (r) { WBERR(); return r; } #else ti->split_io = (1 << 3); #endif wb = kzalloc(sizeof(*wb), GFP_KERNEL); if (!wb) { WBERR(); return -ENOMEM; } /* * EMC's textbook on storage system says * storage should keep its disk util less * than 70%. */ wb->migrate_threshold = 70; atomic64_set(&wb->nr_dirty_caches, 0); r = dm_get_device(ti, argv[0], dm_table_get_mode(ti->table), &origdev); if (r) { WBERR("%d", r); goto bad_get_device_orig; } wb->device = origdev; wb->cache = NULL; if (dm_get_device(ti, argv[1], dm_table_get_mode(ti->table), &cachedev)) { WBERR(); goto bad_get_device_cache; } r = audit_cache_device(cachedev, &cache_valid); if (r) { WBERR("%d", r); /* * If something happens in auditing the cache * such as read io error either go formatting * or resume it trusting the cache is valid * are dangerous. So we quit. */ goto bad_audit_cache; } if (!cache_valid) { r = format_cache_device(cachedev); if (r) { WBERR("%d", r); goto bad_format_cache; } } cache = kzalloc(sizeof(*cache), GFP_KERNEL); if (!cache) { WBERR(); goto bad_alloc_cache; } wb->cache = cache; wb->cache->wb = wb; r = resume_cache(cache, cachedev); if (r) { WBERR("%d", r); goto bad_resume_cache; } wb->ti = ti; ti->private = wb; #if LINUX_VERSION_CODE >= PER_BIO_VERSION ti->per_bio_data_size = sizeof(struct per_bio_data); #endif #if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 9, 0) ti->num_flush_bios = 1; ti->num_discard_bios = 1; #else ti->num_flush_requests = 1; ti->num_discard_requests = 1; #endif ti->discard_zeroes_data_unsupported = true; return 0; bad_resume_cache: kfree(cache); bad_alloc_cache: bad_format_cache: bad_audit_cache: dm_put_device(ti, cachedev); bad_get_device_cache: dm_put_device(ti, origdev); bad_get_device_orig: kfree(wb); return r; } static void writeboost_dtr(struct dm_target *ti) { struct wb_device *wb = ti->private; struct wb_cache *cache = wb->cache; /* * Synchronize all the dirty writes * before termination. */ cache->sync_interval = 1; free_cache(cache); kfree(cache); dm_put_device(wb->ti, cache->device); dm_put_device(ti, wb->device); ti->private = NULL; kfree(wb); } static int writeboost_message(struct dm_target *ti, unsigned argc, char **argv) { struct wb_device *wb = ti->private; struct wb_cache *cache = wb->cache; char *cmd = argv[0]; unsigned long tmp; if (!strcasecmp(cmd, "clear_stat")) { struct wb_cache *cache = wb->cache; clear_stat(cache); return 0; } if (kstrtoul(argv[1], 10, &tmp)) return -EINVAL; if (!strcasecmp(cmd, "allow_migrate")) { if (tmp > 1) return -EINVAL; cache->allow_migrate = tmp; return 0; } if (!strcasecmp(cmd, "enable_migration_modulator")) { if (tmp > 1) return -EINVAL; cache->enable_migration_modulator = tmp; return 0; } if (!strcasecmp(cmd, "barrier_deadline_ms")) { if (tmp < 1) return -EINVAL; cache->barrier_deadline_ms = tmp; return 0; } if (!strcasecmp(cmd, "nr_max_batched_migration")) { if (tmp < 1) return -EINVAL; cache->nr_max_batched_migration = tmp; return 0; } if (!strcasecmp(cmd, "migrate_threshold")) { wb->migrate_threshold = tmp; return 0; } if (!strcasecmp(cmd, "update_record_interval")) { cache->update_record_interval = tmp; return 0; } if (!strcasecmp(cmd, "sync_interval")) { cache->sync_interval = tmp; return 0; } return -EINVAL; } static int writeboost_merge(struct dm_target *ti, struct bvec_merge_data *bvm, struct bio_vec *biovec, int max_size) { struct wb_device *wb = ti->private; struct dm_dev *device = wb->device; struct request_queue *q = bdev_get_queue(device->bdev); if (!q->merge_bvec_fn) return max_size; bvm->bi_bdev = device->bdev; return min(max_size, q->merge_bvec_fn(q, bvm, biovec)); } static int writeboost_iterate_devices(struct dm_target *ti, iterate_devices_callout_fn fn, void *data) { struct wb_device *wb = ti->private; struct dm_dev *orig = wb->device; sector_t start = 0; sector_t len = dm_devsize(orig); return fn(ti, orig, start, len, data); } static void writeboost_io_hints(struct dm_target *ti, struct queue_limits *limits) { blk_limits_io_min(limits, 512); blk_limits_io_opt(limits, 4096); } static #if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 8, 0) void #else int #endif writeboost_status( struct dm_target *ti, status_type_t type, #if LINUX_VERSION_CODE >= KERNEL_VERSION(3, 6, 0) unsigned flags, #endif char *result, unsigned maxlen) { unsigned int sz = 0; struct wb_device *wb = ti->private; struct wb_cache *cache = wb->cache; size_t i; switch (type) { case STATUSTYPE_INFO: DMEMIT("%llu %llu %llu %llu %llu %u ", (long long unsigned int) atomic64_read(&wb->nr_dirty_caches), (long long unsigned int) cache->nr_segments, (long long unsigned int) cache->last_migrated_segment_id, (long long unsigned int) cache->last_flushed_segment_id, (long long unsigned int) cache->current_seg->global_id, (unsigned int) cache->cursor); for (i = 0; i < STATLEN; i++) { atomic64_t *v; if (i == (STATLEN-1)) break; v = &cache->stat[i]; DMEMIT("%lu ", atomic64_read(v)); } DMEMIT("%d ", 7); DMEMIT("barrier_deadline_ms %lu ", cache->barrier_deadline_ms); DMEMIT("allow_migrate %d ", cache->allow_migrate ? 1 : 0); DMEMIT("enable_migration_modulator %d ", cache->enable_migration_modulator ? 1 : 0); DMEMIT("migrate_threshold %d ", wb->migrate_threshold); DMEMIT("nr_cur_batched_migration %lu ", cache->nr_cur_batched_migration); DMEMIT("sync_interval %lu ", cache->sync_interval); DMEMIT("update_record_interval %lu", cache->update_record_interval); break; case STATUSTYPE_TABLE: DMEMIT("%s %s", wb->device->name, wb->cache->device->name); break; } #if LINUX_VERSION_CODE < KERNEL_VERSION(3, 8, 0) return 0; #endif } static struct target_type writeboost_target = { .name = "writeboost", .version = {0, 1, 0}, .module = THIS_MODULE, .map = writeboost_map, .ctr = writeboost_ctr, .dtr = writeboost_dtr, .end_io = writeboost_end_io, .merge = writeboost_merge, .message = writeboost_message, .status = writeboost_status, .io_hints = writeboost_io_hints, .iterate_devices = writeboost_iterate_devices, }; struct dm_io_client *wb_io_client; struct workqueue_struct *safe_io_wq; static int __init writeboost_module_init(void) { int r = 0; r = dm_register_target(&writeboost_target); if (r < 0) { WBERR("%d", r); return r; } r = -ENOMEM; safe_io_wq = alloc_workqueue("safeiowq", WQ_NON_REENTRANT | WQ_MEM_RECLAIM, 0); if (!safe_io_wq) { WBERR(); goto bad_wq; } wb_io_client = dm_io_client_create(); if (IS_ERR(wb_io_client)) { WBERR(); r = PTR_ERR(wb_io_client); goto bad_io_client; } return 0; bad_io_client: destroy_workqueue(safe_io_wq); bad_wq: dm_unregister_target(&writeboost_target); return r; } static void __exit writeboost_module_exit(void) { dm_io_client_destroy(wb_io_client); destroy_workqueue(safe_io_wq); dm_unregister_target(&writeboost_target); } module_init(writeboost_module_init); module_exit(writeboost_module_exit); MODULE_AUTHOR("Akira Hayakawa <ruby.wktk@xxxxxxxxx>"); MODULE_DESCRIPTION(DM_NAME " writeboost target"); MODULE_LICENSE("GPL"); ---------- util.c ---------- /* * Copyright (C) 2012-2013 Akira Hayakawa <ruby.wktk@xxxxxxxxx> * * This file is released under the GPL. */ #include "util.h" void *do_kmalloc_retry(size_t size, gfp_t flags, int lineno) { size_t count = 0; void *p; retry_alloc: p = kmalloc(size, flags); if (!p) { count++; WBWARN("L%d size:%lu, count:%lu", lineno, size, count); schedule_timeout_interruptible(msecs_to_jiffies(1)); goto retry_alloc; } return p; } struct safe_io { struct work_struct work; int err; unsigned long err_bits; struct dm_io_request *io_req; unsigned num_regions; struct dm_io_region *regions; }; static void safe_io_proc(struct work_struct *work) { struct safe_io *io = container_of(work, struct safe_io, work); io->err_bits = 0; io->err = dm_io(io->io_req, io->num_regions, io->regions, &io->err_bits); } /* * dm_io wrapper. * @thread run this operation in other thread to avoid deadlock. */ int dm_safe_io_internal( struct dm_io_request *io_req, unsigned num_regions, struct dm_io_region *regions, unsigned long *err_bits, bool thread, int lineno) { int err; dev_t dev; if (thread) { struct safe_io io = { .io_req = io_req, .regions = regions, .num_regions = num_regions, }; INIT_WORK_ONSTACK(&io.work, safe_io_proc); queue_work(safe_io_wq, &io.work); flush_work(&io.work); err = io.err; if (err_bits) *err_bits = io.err_bits; } else { err = dm_io(io_req, num_regions, regions, err_bits); } dev = regions->bdev->bd_dev; /* dm_io routines permits NULL for err_bits pointer. */ if (err || (err_bits && *err_bits)) { unsigned long eb; if (!err_bits) eb = (~(unsigned long)0); else eb = *err_bits; WBERR("L%d err(%d, %lu), rw(%d), sector(%lu), dev(%u:%u)", lineno, err, eb, io_req->bi_rw, regions->sector, MAJOR(dev), MINOR(dev)); } return err; } void dm_safe_io_retry_internal( struct dm_io_request *io_req, unsigned num_regions, struct dm_io_region *regions, bool thread, int lineno) { int err, count = 0; unsigned long err_bits; dev_t dev; retry_io: err_bits = 0; err = dm_safe_io_internal(io_req, num_regions, regions, &err_bits, thread, lineno); dev = regions->bdev->bd_dev; if (err || err_bits) { count++; WBWARN("L%d count(%d)", lineno, count); schedule_timeout_interruptible(msecs_to_jiffies(1000)); goto retry_io; } if (count) { WBWARN("L%d rw(%d), sector(%lu), dev(%u:%u)", lineno, io_req->bi_rw, regions->sector, MAJOR(dev), MINOR(dev)); } } sector_t dm_devsize(struct dm_dev *dev) { return i_size_read(dev->bdev->bd_inode) >> SECTOR_SHIFT; } -- dm-devel mailing list dm-devel@xxxxxxxxxx https://www.redhat.com/mailman/listinfo/dm-devel