On Wed, Jun 29, 2011 at 05:25:17PM +0200, Andrea Righi wrote: [..] > > +void > > +blk_throtl_dirty_pages(struct address_space *mapping, unsigned long nr_dirty) > > +{ > > + struct request_queue *q; > > + struct block_device *bdev; > > + > > + bdev = mapping->host->i_sb->s_bdev; > > + if (!bdev) > > + return; > > + q = bdev_get_queue(bdev); > > + > > + if (unlikely(!q)) > > + return; > > + > > + if (unlikely(test_bit(QUEUE_FLAG_DEAD, &q->queue_flags))) > > + return; > > + > > + __blk_throtl_dirty_pages(q, nr_dirty); > > +} > > This doesn't throttle buffered WRITEs to raw devices. Maybe we should do > something like this. > Thanks Andrea. I have updated the patch to fix both the issues pointed by you (raw device fix as well as compile warning). I also have added your name in Signed-off-by, in core patch as you provided this raw device fix. Thanks Vivek Subject: blk-throttle: core logic to throttle task while dirtying pages This is the core logic which enables throttling logic to also throttle tasks while dirtying pages. I basically create the infrastructure where a wait queue is maintained per group and if task exceeds its rate limit, task is put to sleep on wait queue and woken up later. Though currently we throttle tasks for WRITE requests and not for READ requests, I have built the logic to support task throttling for both direction (similar to bio throttling). This will allow us to do any task READ throttling also easily, if needed in future. Signed-off-by: Vivek Goyal <vgoyal@xxxxxxxxxx> Signed-off-by: Andrea Righi <andrea@xxxxxxxxxxxxxxx> --- block/blk-throttle.c | 405 +++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 364 insertions(+), 41 deletions(-) Index: linux-2.6-block/block/blk-throttle.c =================================================================== --- linux-2.6-block.orig/block/blk-throttle.c 2011-06-29 15:45:46.415553948 -0400 +++ linux-2.6-block/block/blk-throttle.c 2011-06-29 15:49:52.592309807 -0400 @@ -32,6 +32,12 @@ struct throtl_rb_root { unsigned long min_disptime; }; +/* Whether to dispatch a bio or task in next round of a dispatch from a group */ +enum dispatch_type { + DISPATCH_BIO, + DISPATCH_TASK, +}; + #define THROTL_RB_ROOT (struct throtl_rb_root) { .rb = RB_ROOT, .left = NULL, \ .count = 0, .min_disptime = 0} @@ -59,7 +65,10 @@ struct throtl_grp { struct bio_list bio_lists[2]; /* Number of queued bios on READ and WRITE lists */ - unsigned int nr_queued[2]; + unsigned int nr_queued_bio[2]; + + /* Number of queued tasks */ + unsigned int nr_queued_tsk[2]; /* bytes per second rate limits */ uint64_t bps[2]; @@ -80,6 +89,21 @@ struct throtl_grp { int limits_changed; struct rcu_head rcu_head; + + /* READ and WRITE wait lists for task throttle */ + wait_queue_head_t wait[2]; + + /* Number of unaccounted dirty pages in this group */ + unsigned int unaccounted_dirty; + + /* The number of pages to be charged to first task on wait queue */ + unsigned int active_task_charge[2]; + + /* + * Keeps track of whether we will dispatch a bio or task in next + * round of dispatch (for each dir) + */ + enum dispatch_type active_dispatch[2]; }; struct throtl_data @@ -94,7 +118,10 @@ struct throtl_data struct request_queue *queue; /* Total Number of queued bios on READ and WRITE lists */ - unsigned int nr_queued[2]; + unsigned int nr_queued_bio[2]; + + /* Total Number of queued tasks on READ and WRITE lists */ + unsigned int nr_queued_tsk[2]; /* * number of total undestroyed groups @@ -142,9 +169,19 @@ static inline struct throtl_grp *tg_of_b return NULL; } +static inline int total_queued_bios(struct throtl_data *td) +{ + return td->nr_queued_bio[0] + td->nr_queued_bio[1]; +} + +static inline int total_queued_tasks(struct throtl_data *td) +{ + return td->nr_queued_tsk[0] + td->nr_queued_tsk[1]; +} + static inline unsigned int total_nr_queued(struct throtl_data *td) { - return td->nr_queued[0] + td->nr_queued[1]; + return total_queued_bios(td) + total_queued_tasks(td); } static inline struct throtl_grp *throtl_ref_get_tg(struct throtl_grp *tg) @@ -192,6 +229,9 @@ static void throtl_init_group(struct thr tg->bps[0] = tg->bps[1] = -1; tg->iops[0] = tg->iops[1] = -1; + init_waitqueue_head(&tg->wait[READ]); + init_waitqueue_head(&tg->wait[WRITE]); + /* * Take the initial reference that will be released on destroy * This can be thought of a joint reference by cgroup and @@ -727,6 +767,14 @@ static void throtl_charge_io(struct thro blkiocg_update_dispatch_stats(&tg->blkg, sz, nr_ios, rw, sync); } +static void throtl_charge_dirty_io(struct throtl_grp *tg, bool rw, + unsigned int sz, unsigned int nr_ios, bool sync) +{ + tg->unaccounted_dirty -= nr_ios; + BUG_ON(tg->unaccounted_dirty < 0); + throtl_charge_io(tg, rw, sz, nr_ios, sync); +} + static void throtl_add_bio_tg(struct throtl_data *td, struct throtl_grp *tg, struct bio *bio) { @@ -735,21 +783,38 @@ static void throtl_add_bio_tg(struct thr bio_list_add(&tg->bio_lists[rw], bio); /* Take a bio reference on tg */ throtl_ref_get_tg(tg); - tg->nr_queued[rw]++; - td->nr_queued[rw]++; + tg->nr_queued_bio[rw]++; + td->nr_queued_bio[rw]++; throtl_enqueue_tg(td, tg); } -static void tg_update_disptime(struct throtl_data *td, struct throtl_grp *tg) +/* + * Returns the wait time of first bio or task in the specified direction. If + * nothing is queued then wait time is -1. + */ +static unsigned long +tg_active_wait_time(struct throtl_data *td, struct throtl_grp *tg, bool rw) { - unsigned long read_wait = -1, write_wait = -1, min_wait = -1, disptime; struct bio *bio; + unsigned long sz; + + bio = bio_list_peek(&tg->bio_lists[rw]); + if (bio && tg->active_dispatch[rw] == DISPATCH_BIO) + return tg_wait_dispatch(td, tg, rw, bio->bi_size, 1); + + if (!waitqueue_active(&tg->wait[rw])) + return -1; - if ((bio = bio_list_peek(&tg->bio_lists[READ]))) - read_wait = tg_wait_dispatch(td, tg, READ, bio->bi_size, 1); + sz = tg->active_task_charge[rw] << PAGE_SHIFT; + return tg_wait_dispatch(td, tg, rw, sz, tg->active_task_charge[rw]); +} - if ((bio = bio_list_peek(&tg->bio_lists[WRITE]))) - write_wait = tg_wait_dispatch(td, tg, WRITE, bio->bi_size, 1); +static void tg_update_disptime(struct throtl_data *td, struct throtl_grp *tg) +{ + unsigned long read_wait, write_wait, min_wait, disptime; + + read_wait = tg_active_wait_time(td, tg, READ); + write_wait = tg_active_wait_time(td, tg, WRITE); min_wait = min(read_wait, write_wait); disptime = jiffies + min_wait; @@ -760,18 +825,39 @@ static void tg_update_disptime(struct th throtl_enqueue_tg(td, tg); } +/* Calculate how many pages must be charged to a task at this point of time. */ +static unsigned int tg_calc_charge_pages(struct throtl_grp *tg, bool rw) +{ + unsigned int nr_pages; + + /* Divide unaccounted number of pages among queued tasks */ + nr_pages = tg->unaccounted_dirty/tg->nr_queued_tsk[rw]; + + return max_t(unsigned int, nr_pages, 1); +} + +/* set the active task charge for the first task in the queue */ +static void tg_set_active_task_charge(struct throtl_grp *tg, bool rw) +{ + /* If there are more tasks queued, calculate the charge */ + if (tg->nr_queued_tsk[rw]) + tg->active_task_charge[rw] = tg_calc_charge_pages(tg, rw); + else + tg->active_task_charge[rw] = 0; +} + static void tg_dispatch_one_bio(struct throtl_data *td, struct throtl_grp *tg, bool rw, struct bio_list *bl) { struct bio *bio; bio = bio_list_pop(&tg->bio_lists[rw]); - tg->nr_queued[rw]--; + tg->nr_queued_bio[rw]--; /* Drop bio reference on tg */ throtl_put_tg(tg); - BUG_ON(td->nr_queued[rw] <= 0); - td->nr_queued[rw]--; + BUG_ON(td->nr_queued_bio[rw] <= 0); + td->nr_queued_bio[rw]--; throtl_charge_io(tg, rw, bio->bi_size, 1, bio->bi_rw & REQ_SYNC); bio_list_add(bl, bio); @@ -780,39 +866,137 @@ static void tg_dispatch_one_bio(struct t throtl_trim_slice(td, tg, rw); } -static int throtl_dispatch_tg(struct throtl_data *td, struct throtl_grp *tg, - struct bio_list *bl) +static int throtl_dispatch_tg_bio(struct throtl_data *td, struct throtl_grp *tg, + bool rw, struct bio_list *bl, unsigned int max_disp) { - unsigned int nr_reads = 0, nr_writes = 0; - unsigned int max_nr_reads = throtl_grp_quantum*3/4; - unsigned int max_nr_writes = throtl_grp_quantum - max_nr_reads; struct bio *bio; + unsigned int nr_disp = 0; - /* Try to dispatch 75% READS and 25% WRITES */ - - while ((bio = bio_list_peek(&tg->bio_lists[READ])) - && !tg_wait_dispatch(td, tg, READ, bio->bi_size, 1)) { + while ((bio = bio_list_peek(&tg->bio_lists[rw])) + && !tg_wait_dispatch(td, tg, rw, bio->bi_size, 1)) { tg_dispatch_one_bio(td, tg, bio_data_dir(bio), bl); - nr_reads++; + nr_disp++; - if (nr_reads >= max_nr_reads) + if (nr_disp >= max_disp) break; } - while ((bio = bio_list_peek(&tg->bio_lists[WRITE])) - && !tg_wait_dispatch(td, tg, WRITE, bio->bi_size, 1)) { + return nr_disp; +} - tg_dispatch_one_bio(td, tg, bio_data_dir(bio), bl); - nr_writes++; +static void throtl_dispatch_tg_task(struct throtl_data *td, + struct throtl_grp *tg, bool rw, unsigned int max_disp) +{ + unsigned int sz; - if (nr_writes >= max_nr_writes) - break; + if (!waitqueue_active(&tg->wait[rw])) + return; + + /* Wake up a task */ + wake_up(&tg->wait[rw]); + + tg->nr_queued_tsk[rw]--; + td->nr_queued_tsk[rw]--; + + /* Charge the woken up task for IO */ + sz = tg->active_task_charge[rw] << PAGE_SHIFT; + throtl_charge_dirty_io(tg, rw, sz, tg->active_task_charge[rw], false); + throtl_trim_slice(td, tg, rw); + + /* + * We dispatched one task. Set the charge for other queued tasks, + * if any. + */ + tg_set_active_task_charge(tg, rw); + throtl_log_tg(td, tg, "[%c] Wake up a task. bdisp=%llu sz=%u bps=%llu" + " iodisp=%u iops=%u bioq=%d/%d taskq=%d/%d", + rw == READ ? 'R' : 'W', tg->bytes_disp[rw], sz, + tg->bps[rw], tg->io_disp[rw], tg->iops[rw], + tg->nr_queued_bio[READ], tg->nr_queued_bio[WRITE], + tg->nr_queued_tsk[READ], tg->nr_queued_tsk[WRITE]); +} + +static void tg_switch_active_dispatch(struct throtl_data *td, + struct throtl_grp *tg, bool rw) +{ + unsigned int nr_tasks = tg->nr_queued_tsk[rw]; + unsigned int nr_bios = tg->nr_queued_bio[rw]; + enum dispatch_type curr_dispatch = tg->active_dispatch[rw]; + + /* Nothing queued. Whoever gets queued next first, sets dispatch type */ + if (!nr_bios && !nr_tasks) + return; + + if (curr_dispatch == DISPATCH_BIO && nr_tasks) { + tg->active_dispatch[rw] = DISPATCH_TASK; + return; + } + + if (curr_dispatch == DISPATCH_TASK && nr_bios) + tg->active_dispatch[rw] = DISPATCH_BIO; +} + +static void tg_update_active_dispatch(struct throtl_data *td, + struct throtl_grp *tg, bool rw) +{ + unsigned int nr_tasks = tg->nr_queued_tsk[rw]; + unsigned int nr_bios = tg->nr_queued_bio[rw]; + enum dispatch_type curr_dispatch = tg->active_dispatch[rw]; + + BUG_ON(nr_bios < 0 || nr_tasks < 0); + + if (curr_dispatch == DISPATCH_BIO && !nr_bios) { + tg->active_dispatch[rw] = DISPATCH_TASK; + return; } + if (curr_dispatch == DISPATCH_TASK && !nr_tasks) + tg->active_dispatch[rw] = DISPATCH_BIO; +} + +static int throtl_dispatch_tg_rw(struct throtl_data *td, struct throtl_grp *tg, + bool rw, struct bio_list *bl, unsigned int max) +{ + unsigned int nr_disp = 0; + + if (tg->active_dispatch[rw] == DISPATCH_BIO) + nr_disp = throtl_dispatch_tg_bio(td, tg, rw, bl, max); + else + /* Only number of bios dispatched is kept track of here */ + throtl_dispatch_tg_task(td, tg, rw, max); + + tg_switch_active_dispatch(td, tg, rw); + return nr_disp; +} + +static int throtl_dispatch_tg(struct throtl_data *td, struct throtl_grp *tg, + struct bio_list *bl) +{ + /* Try to dispatch 75% READS and 25% WRITES */ + unsigned int nr_reads = 0, nr_writes = 0; + unsigned int max_nr_reads = throtl_grp_quantum*3/4; + unsigned int max_nr_writes = throtl_grp_quantum - max_nr_reads; + + nr_reads = throtl_dispatch_tg_rw(td, tg, READ, bl, max_nr_reads); + nr_writes = throtl_dispatch_tg_rw(td, tg, WRITE, bl, max_nr_writes); + return nr_reads + nr_writes; } +static bool tg_should_requeue(struct throtl_grp *tg) +{ + /* If there are queued bios, requeue */ + if (tg->nr_queued_bio[0] || tg->nr_queued_bio[1]) + return 1; + + /* If there are queued tasks reueue */ + if (tg->nr_queued_tsk[0] || tg->nr_queued_tsk[1]) + return 1; + + return 0; +} + static int throtl_select_dispatch(struct throtl_data *td, struct bio_list *bl) { unsigned int nr_disp = 0; @@ -832,7 +1016,7 @@ static int throtl_select_dispatch(struct nr_disp += throtl_dispatch_tg(td, tg, bl); - if (tg->nr_queued[0] || tg->nr_queued[1]) { + if (tg_should_requeue(tg)) { tg_update_disptime(td, tg); throtl_enqueue_tg(td, tg); } @@ -899,9 +1083,9 @@ static int throtl_dispatch(struct reques bio_list_init(&bio_list_on_stack); - throtl_log(td, "dispatch nr_queued=%u read=%u write=%u", - total_nr_queued(td), td->nr_queued[READ], - td->nr_queued[WRITE]); + throtl_log(td, "dispatch bioq=%u/%u tskq=%u/%u", + td->nr_queued_bio[READ], td->nr_queued_bio[WRITE], + td->nr_queued_tsk[READ], td->nr_queued_tsk[WRITE]); nr_disp = throtl_select_dispatch(td, &bio_list_on_stack); @@ -1122,7 +1306,7 @@ int blk_throtl_bio(struct request_queue if (tg_no_rule_group(tg, rw)) { blkiocg_update_dispatch_stats(&tg->blkg, bio->bi_size, - rw, bio->bi_rw & REQ_SYNC); + 1, rw, bio->bi_rw & REQ_SYNC); rcu_read_unlock(); return 0; } @@ -1146,14 +1330,14 @@ int blk_throtl_bio(struct request_queue } } - if (tg->nr_queued[rw]) { + /* If there are already queued bio or task in same dir, queue bio */ + if (tg->nr_queued_bio[rw] || tg->nr_queued_tsk[rw]) { /* - * There is already another bio queued in same dir. No + * There is already another bio/task queued in same dir. No * need to update dispatch time. */ update_disptime = false; goto queue_bio; - } /* Bio is with-in rate limit of group */ @@ -1178,16 +1362,18 @@ int blk_throtl_bio(struct request_queue queue_bio: throtl_log_tg(td, tg, "[%c] bio. bdisp=%llu sz=%u bps=%llu" - " iodisp=%u iops=%u queued=%d/%d", + " iodisp=%u iops=%u bioq=%u/%u taskq=%u/%u", rw == READ ? 'R' : 'W', tg->bytes_disp[rw], bio->bi_size, tg->bps[rw], tg->io_disp[rw], tg->iops[rw], - tg->nr_queued[READ], tg->nr_queued[WRITE]); + tg->nr_queued_bio[READ], tg->nr_queued_bio[WRITE], + tg->nr_queued_tsk[READ], tg->nr_queued_tsk[WRITE]); throtl_add_bio_tg(q->td, tg, bio); *biop = NULL; if (update_disptime) { + tg_update_active_dispatch(td, tg, rw); tg_update_disptime(td, tg); throtl_schedule_next_dispatch(td); } @@ -1197,6 +1383,143 @@ out: return 0; } +static void +__blk_throtl_dirty_pages(struct request_queue *q, unsigned long nr_dirty) +{ + struct throtl_data *td = q->td; + struct throtl_grp *tg; + struct blkio_cgroup *blkcg; + bool rw = WRITE, update_disptime = true, first_task = false; + unsigned int sz = nr_dirty << PAGE_SHIFT; + DEFINE_WAIT(wait); + + /* + * A throtl_grp pointer retrieved under rcu can be used to access + * basic fields like stats and io rates. If a group has no rules, + * just update the dispatch stats in lockless manner and return. + */ + + rcu_read_lock(); + blkcg = task_blkio_cgroup(current); + tg = throtl_find_tg(td, blkcg); + if (tg) { + throtl_tg_fill_dev_details(td, tg); + + if (tg_no_rule_group(tg, rw)) { + blkiocg_update_dispatch_stats(&tg->blkg, sz, nr_dirty, + rw, 0); + rcu_read_unlock(); + return; + } + } + rcu_read_unlock(); + + spin_lock_irq(q->queue_lock); + + tg = throtl_get_tg(td); + + /* Queue is gone. No queue lock held here. */ + if (IS_ERR(tg)) + return; + + tg->unaccounted_dirty += nr_dirty; + + /* If there are already queued task, put this task also on waitq */ + if (tg->nr_queued_tsk[rw]) { + update_disptime = false; + goto queue_task; + } else + first_task = true; + + /* If there are bios already throttled in same dir, queue task */ + if (!bio_list_empty(&tg->bio_lists[rw])) { + update_disptime = false; + goto queue_task; + } + + /* + * Task is with-in rate limit of group. + * + * Note: How many IOPS we should charge for this operation. For + * the time being I am sticking to number of pages as number of + * IOPS. + */ + if (!tg_wait_dispatch(td, tg, rw, sz, nr_dirty)) { + throtl_charge_dirty_io(tg, rw, sz, nr_dirty, 0); + throtl_trim_slice(td, tg, rw); + goto out; + } + +queue_task: + throtl_log_tg(td, tg, "[%c] task. bdisp=%llu sz=%u bps=%llu" + " iodisp=%u iops=%u bioq=%d/%d taskq=%d/%d", + rw == READ ? 'R' : 'W', + tg->bytes_disp[rw], sz, tg->bps[rw], + tg->io_disp[rw], tg->iops[rw], + tg->nr_queued_bio[READ], tg->nr_queued_bio[WRITE], + tg->nr_queued_tsk[READ], tg->nr_queued_tsk[WRITE]); + + /* Take a task reference on tg */ + throtl_ref_get_tg(tg); + tg->nr_queued_tsk[rw]++; + td->nr_queued_tsk[rw]++; + prepare_to_wait_exclusive(&tg->wait[rw], &wait, TASK_UNINTERRUPTIBLE); + + if (first_task) + tg_set_active_task_charge(tg, rw); + + if (update_disptime) { + tg_update_active_dispatch(td, tg, rw); + tg_update_disptime(td, tg); + throtl_schedule_next_dispatch(td); + } else + throtl_enqueue_tg(td, tg); + + spin_unlock_irq(q->queue_lock); + + /* Task has been put on a wait queue */ + io_schedule(); + + /* + * Task has woken up. Finish wait, drop reference to the group. + */ + spin_lock_irq(q->queue_lock); + finish_wait(&tg->wait[rw], &wait); + + /* Drop task reference on group */ + throtl_put_tg(tg); +out: + spin_unlock_irq(q->queue_lock); + return; +} + +void +blk_throtl_dirty_pages(struct address_space *mapping, unsigned long nr_dirty) +{ + struct request_queue *q; + struct block_device *bdev; + struct inode *inode = mapping->host; + + if (S_ISBLK(inode->i_mode) && inode->i_bdev) { + bdev = inode->i_bdev; + } else { + bdev = inode->i_sb->s_bdev; + if (!bdev) + return; + } + + q = bdev_get_queue(bdev); + + if (unlikely(!q)) + return; + + if (unlikely(test_bit(QUEUE_FLAG_DEAD, &q->queue_flags))) + return; + + __blk_throtl_dirty_pages(q, nr_dirty); +} + + int blk_throtl_init(struct request_queue *q) { struct throtl_data *td; -- To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in the body of a message to majordomo@xxxxxxxxxxxxxxx More majordomo info at http://vger.kernel.org/majordomo-info.html