Re: [PATCH 07/12] writeback: support > 1 flusher thread per bdi

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Tue, May 26, 2009 at 11:33:45AM +0200, Jens Axboe wrote:
> Build on the bdi_writeback support by allowing registration of
> more than 1 flusher thread. File systems can call bdi_add_flusher_task(bdi)
> to add more flusher threads to the device. If they do so, they must also
> provide a super_operations function to return the suitable bdi_writeback
> struct from any given inode.

Looks good from an RCU perspective.  SRCU used for wb_list, RCU for the
other RCU-protected data.  ;-)

							Thanx, Paul

> Signed-off-by: Jens Axboe <jens.axboe@xxxxxxxxxx>
> ---
>  fs/fs-writeback.c           |  448 +++++++++++++++++++++++++++++++++++--------
>  include/linux/backing-dev.h |   34 +++-
>  include/linux/fs.h          |    3 +
>  mm/backing-dev.c            |  233 ++++++++++++++++++-----
>  mm/page-writeback.c         |    4 +-
>  5 files changed, 586 insertions(+), 136 deletions(-)
> 
> diff --git a/fs/fs-writeback.c b/fs/fs-writeback.c
> index e72db8b..8e0902e 100644
> --- a/fs/fs-writeback.c
> +++ b/fs/fs-writeback.c
> @@ -34,83 +34,247 @@
>   */
>  int nr_pdflush_threads;
> 
> -/**
> - * writeback_acquire - attempt to get exclusive writeback access to a device
> - * @bdi: the device's backing_dev_info structure
> - *
> - * It is a waste of resources to have more than one pdflush thread blocked on
> - * a single request queue.  Exclusion at the request_queue level is obtained
> - * via a flag in the request_queue's backing_dev_info.state.
> - *
> - * Non-request_queue-backed address_spaces will share default_backing_dev_info,
> - * unless they implement their own.  Which is somewhat inefficient, as this
> - * may prevent concurrent writeback against multiple devices.
> +static void generic_sync_wb_inodes(struct bdi_writeback *wb,
> +				   struct super_block *sb,
> +				   struct writeback_control *wbc);
> +
> +/*
> + * Work items for the bdi_writeback threads
>   */
> -static int writeback_acquire(struct bdi_writeback *wb)
> +struct bdi_work {
> +	struct list_head list;
> +	struct list_head wait_list;
> +	struct rcu_head rcu_head;
> +
> +	unsigned long seen;
> +	atomic_t pending;
> +
> +	unsigned long sb_data;
> +	unsigned long nr_pages;
> +	enum writeback_sync_modes sync_mode;
> +
> +	unsigned long state;
> +};
> +
> +static struct super_block *bdi_work_sb(struct bdi_work *work)
>  {
> -	struct backing_dev_info *bdi = wb->bdi;
> +	return (struct super_block *) (work->sb_data & ~1UL);
> +}
> +
> +static inline bool bdi_work_on_stack(struct bdi_work *work)
> +{
> +	return work->sb_data & 1UL;
> +}
> +
> +static inline void bdi_work_init(struct bdi_work *work, struct super_block *sb,
> +				 unsigned long nr_pages,
> +				 enum writeback_sync_modes sync_mode)
> +{
> +	INIT_RCU_HEAD(&work->rcu_head);
> +	work->sb_data = (unsigned long) sb;
> +	work->nr_pages = nr_pages;
> +	work->sync_mode = sync_mode;
> +	work->state = 1;
> +
> +	/*
> +	 * state must not be reordered around the insert
> +	 */
> +	smp_mb();
> +}
> 
> -	return !test_and_set_bit(wb->nr, &bdi->wb_active);
> +static inline void bdi_work_init_on_stack(struct bdi_work *work,
> +					  struct super_block *sb,
> +					  unsigned long nr_pages,
> +					  enum writeback_sync_modes sync_mode)
> +{
> +	bdi_work_init(work, sb, nr_pages, sync_mode);
> +	work->sb_data |= 1UL;
>  }
> 
>  /**
>   * writeback_in_progress - determine whether there is writeback in progress
>   * @bdi: the device's backing_dev_info structure.
>   *
> - * Determine whether there is writeback in progress against a backing device.
> + * Determine whether there is writeback waiting to be handled against a
> + * backing device.
>   */
>  int writeback_in_progress(struct backing_dev_info *bdi)
>  {
> -	return bdi->wb_active != 0;
> +	return !list_empty(&bdi->work_list);
>  }
> 
> -/**
> - * writeback_release - relinquish exclusive writeback access against a device.
> - * @bdi: the device's backing_dev_info structure
> - */
> -static void writeback_release(struct bdi_writeback *wb)
> +static void bdi_work_clear(struct bdi_work *work)
>  {
> -	struct backing_dev_info *bdi = wb->bdi;
> +	clear_bit(0, &work->state);
> +	smp_mb__after_clear_bit();
> +	wake_up_bit(&work->state, 0);
> +}
> +
> +static void bdi_work_free(struct rcu_head *head)
> +{
> +	struct bdi_work *work = container_of(head, struct bdi_work, rcu_head);
> 
> -	wb->nr_pages = 0;
> -	wb->sb = NULL;
> -	clear_bit(wb->nr, &bdi->wb_active);
> +	if (!bdi_work_on_stack(work))
> +		kfree(work);
> +	else
> +		bdi_work_clear(work);
>  }
> 
> -static void wb_start_writeback(struct bdi_writeback *wb, struct super_block *sb,
> -			       long nr_pages,
> -			       enum writeback_sync_modes sync_mode)
> +static void wb_work_complete(struct bdi_work *work)
>  {
> -	if (!wb_has_dirty_io(wb))
> -		return;
> +	if (!bdi_work_on_stack(work)) {
> +		bdi_work_clear(work);
> +
> +		if (work->sync_mode == WB_SYNC_NONE)
> +			call_rcu(&work->rcu_head, bdi_work_free);
> +	} else
> +		call_rcu(&work->rcu_head, bdi_work_free);
> +}
> +
> +static void wb_clear_pending(struct bdi_writeback *wb, struct bdi_work *work)
> +{
> +	/*
> +	 * The caller has retrieved the work arguments from this work,
> +	 * drop our reference. If this is the last ref, delete and free it
> +	 */
> +	if (atomic_dec_and_test(&work->pending)) {
> +		struct backing_dev_info *bdi = wb->bdi;
> 
> -	if (writeback_acquire(wb)) {
> -		wb->nr_pages = nr_pages;
> -		wb->sb = sb;
> -		wb->sync_mode = sync_mode;
> +		spin_lock(&bdi->wb_lock);
> +		list_del_rcu(&work->list);
> +		spin_unlock(&bdi->wb_lock);
> +
> +		wb_work_complete(work);
> +	}
> +}
> +
> +static void wb_start_writeback(struct bdi_writeback *wb, struct bdi_work *work)
> +{
> +	/*
> +	 * If we failed allocating the bdi work item, wake up the wb thread
> +	 * always. As a safety precaution, it'll flush out everything
> +	 */
> +	if (!wb_has_dirty_io(wb) && work)
> +		wb_clear_pending(wb, work);
> +	else
> +		wake_up(&wb->wait);
> +}
> +
> +static void bdi_queue_work(struct backing_dev_info *bdi, struct bdi_work *work)
> +{
> +	if (work) {
> +		work->seen = bdi->wb_mask;
> +		atomic_set(&work->pending, bdi->wb_cnt);
> 
>  		/*
> -		 * make above store seen before the task is woken
> +		 * Make sure stores are seen before it appears on the list
>  		 */
>  		smp_mb();
> -		wake_up(&wb->wait);
> +
> +		spin_lock(&bdi->wb_lock);
> +		list_add_tail_rcu(&work->list, &bdi->work_list);
> +		spin_unlock(&bdi->wb_lock);
>  	}
>  }
> 
> -int bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
> -			 long nr_pages, enum writeback_sync_modes sync_mode)
> +static void bdi_sched_work(struct backing_dev_info *bdi, struct bdi_work *work)
> +{
> +	if (!bdi_wblist_needs_lock(bdi))
> +		wb_start_writeback(&bdi->wb, work);
> +	else {
> +		struct bdi_writeback *wb;
> +		int idx;
> +
> +		idx = srcu_read_lock(&bdi->srcu);
> +
> +		list_for_each_entry_rcu(wb, &bdi->wb_list, list)
> +			wb_start_writeback(wb, work);
> +
> +		srcu_read_unlock(&bdi->srcu, idx);
> +	}
> +}
> +
> +static void __bdi_start_work(struct backing_dev_info *bdi,
> +			     struct bdi_work *work)
> +{
> +	/*
> +	 * If the default thread isn't there, make sure we add it. When
> +	 * it gets created and wakes up, we'll run this work.
> +	 */
> +	if (unlikely(list_empty_careful(&bdi->wb_list)))
> +		bdi_add_default_flusher_task(bdi);
> +	else
> +		bdi_sched_work(bdi, work);
> +}
> +
> +static void bdi_start_work(struct backing_dev_info *bdi, struct bdi_work *work)
>  {
>  	/*
> -	 * This only happens the first time someone kicks this bdi, so put
> -	 * it out-of-line.
> +	 * If the default thread isn't there, make sure we add it. When
> +	 * it gets created and wakes up, we'll run this work.
>  	 */
> -	if (unlikely(!bdi->wb.task)) {
> +	if (unlikely(list_empty_careful(&bdi->wb_list))) {
> +		mutex_lock(&bdi_lock);
>  		bdi_add_default_flusher_task(bdi);
> -		return 1;
> +		mutex_unlock(&bdi_lock);
> +	} else
> +		bdi_sched_work(bdi, work);
> +}
> +
> +/*
> + * Used for on-stack allocated work items. The caller needs to wait until
> + * the wb threads have acked the work before it's safe to continue.
> + */
> +static void bdi_wait_on_work_clear(struct bdi_work *work)
> +{
> +	wait_on_bit(&work->state, 0, bdi_sched_wait, TASK_UNINTERRUPTIBLE);
> +}
> +
> +static struct bdi_work *bdi_alloc_work(struct super_block *sb, long nr_pages,
> +				       enum writeback_sync_modes sync_mode)
> +{
> +	struct bdi_work *work;
> +
> +	work = kmalloc(sizeof(*work), GFP_ATOMIC);
> +	if (work)
> +		bdi_work_init(work, sb, nr_pages, sync_mode);
> +
> +	return work;
> +}
> +
> +void bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
> +			 long nr_pages, enum writeback_sync_modes sync_mode)
> +{
> +	const bool must_wait = sync_mode == WB_SYNC_ALL;
> +	struct bdi_work work_stack, *work = NULL;
> +
> +	if (!must_wait)
> +		work = bdi_alloc_work(sb, nr_pages, sync_mode);
> +
> +	if (!work) {
> +		work = &work_stack;
> +		bdi_work_init_on_stack(work, sb, nr_pages, sync_mode);
>  	}
> 
> -	wb_start_writeback(&bdi->wb, sb, nr_pages, sync_mode);
> -	return 0;
> +	bdi_queue_work(bdi, work);
> +	bdi_start_work(bdi, work);
> +
> +	/*
> +	 * If the sync mode is WB_SYNC_ALL, block waiting for the work to
> +	 * complete. If not, we only need to wait for the work to be started,
> +	 * if we allocated it on-stack. We use the same mechanism, if the
> +	 * wait bit is set in the bdi_work struct, then threads will not
> +	 * clear pending until after they are done.
> +	 *
> +	 * Note that work == &work_stack if must_wait is true, but that
> +	 * is implementation detail and we make it explicit here for
> +	 * ease of reading.
> +	 */
> +	if (work == &work_stack || must_wait) {
> +		bdi_wait_on_work_clear(work);
> +		if (must_wait)
> +			call_rcu(&work->rcu_head, bdi_work_free);
> +	}
>  }
> 
>  /*
> @@ -160,7 +324,7 @@ static void wb_kupdated(struct bdi_writeback *wb)
>  		wbc.more_io = 0;
>  		wbc.encountered_congestion = 0;
>  		wbc.nr_to_write = MAX_WRITEBACK_PAGES;
> -		generic_sync_bdi_inodes(NULL, &wbc);
> +		generic_sync_wb_inodes(wb, NULL, &wbc);
>  		if (wbc.nr_to_write > 0)
>  			break;	/* All the old data is written */
>  		nr_to_write -= MAX_WRITEBACK_PAGES;
> @@ -177,22 +341,19 @@ static inline bool over_bground_thresh(void)
>  		global_page_state(NR_UNSTABLE_NFS) >= background_thresh);
>  }
> 
> -static void generic_sync_wb_inodes(struct bdi_writeback *wb,
> -				   struct super_block *sb,
> -				   struct writeback_control *wbc);
> -
> -static void wb_writeback(struct bdi_writeback *wb)
> +static void __wb_writeback(struct bdi_writeback *wb, long nr_pages,
> +			   struct super_block *sb,
> +			   enum writeback_sync_modes sync_mode)
>  {
>  	struct writeback_control wbc = {
>  		.bdi			= wb->bdi,
> -		.sync_mode		= wb->sync_mode,
> +		.sync_mode		= sync_mode,
>  		.older_than_this	= NULL,
>  		.range_cyclic		= 1,
>  	};
> -	long nr_pages = wb->nr_pages;
> 
>  	for (;;) {
> -		if (wbc.sync_mode == WB_SYNC_NONE && nr_pages <= 0 &&
> +		if (sync_mode == WB_SYNC_NONE && nr_pages <= 0 &&
>  		    !over_bground_thresh())
>  			break;
> 
> @@ -200,7 +361,7 @@ static void wb_writeback(struct bdi_writeback *wb)
>  		wbc.encountered_congestion = 0;
>  		wbc.nr_to_write = MAX_WRITEBACK_PAGES;
>  		wbc.pages_skipped = 0;
> -		generic_sync_wb_inodes(wb, wb->sb, &wbc);
> +		generic_sync_wb_inodes(wb, sb, &wbc);
>  		nr_pages -= MAX_WRITEBACK_PAGES - wbc.nr_to_write;
>  		/*
>  		 * If we ran out of stuff to write, bail unless more_io got set
> @@ -214,68 +375,175 @@ static void wb_writeback(struct bdi_writeback *wb)
>  }
> 
>  /*
> + * Return the next bdi_work struct that hasn't been processed by this
> + * wb thread yet
> + */
> +static struct bdi_work *get_next_work_item(struct backing_dev_info *bdi,
> +					   struct bdi_writeback *wb)
> +{
> +	struct bdi_work *work, *ret = NULL;
> +
> +	rcu_read_lock();
> +
> +	list_for_each_entry_rcu(work, &bdi->work_list, list) {
> +		if (!test_and_clear_bit(wb->nr, &work->seen))
> +			continue;
> +
> +		ret = work;
> +		break;
> +	}
> +
> +	rcu_read_unlock();
> +	return ret;
> +}
> +
> +/*
> + * Retrieve work items and do the writeback they describe
> + */
> +static void wb_writeback(struct bdi_writeback *wb)
> +{
> +	struct backing_dev_info *bdi = wb->bdi;
> +	struct bdi_work *work;
> +
> +	while ((work = get_next_work_item(bdi, wb)) != NULL) {
> +		struct super_block *sb = bdi_work_sb(work);
> +		long nr_pages = work->nr_pages;
> +		enum writeback_sync_modes sync_mode = work->sync_mode;
> +
> +		/*
> +		 * If this isn't a data integrity operation, just notify
> +		 * that we have seen this work and we are now starting it.
> +		 */
> +		if (sync_mode == WB_SYNC_NONE)
> +			wb_clear_pending(wb, work);
> +
> +		__wb_writeback(wb, nr_pages, sb, sync_mode);
> +
> +		/*
> +		 * This is a data integrity writeback, so only do the
> +		 * notification when we have completed the work.
> +		 */
> +		if (sync_mode == WB_SYNC_ALL)
> +			wb_clear_pending(wb, work);
> +	}
> +}
> +
> +/*
> + * This will be inlined in bdi_writeback_task() once we get rid of any
> + * dirty inodes on the default_backing_dev_info
> + */
> +static void wb_do_writeback(struct bdi_writeback *wb)
> +{
> +	/*
> +	 * We get here in two cases:
> +	 *
> +	 *  schedule_timeout() returned because the dirty writeback
> +	 *  interval has elapsed. If that happens, the work item list
> +	 *  will be empty and we will proceed to do kupdated style writeout.
> +	 *
> +	 *  Someone called bdi_start_writeback(), which put one/more work
> +	 *  items on the work_list. Process those.
> +	 */
> +	if (list_empty(&wb->bdi->work_list))
> +		wb_kupdated(wb);
> +	else
> +		wb_writeback(wb);
> +}
> +
> +/*
>   * Handle writeback of dirty data for the device backed by this bdi. Also
>   * wakes up periodically and does kupdated style flushing.
>   */
>  int bdi_writeback_task(struct bdi_writeback *wb)
>  {
> +	DEFINE_WAIT(wait);
> +
>  	while (!kthread_should_stop()) {
>  		unsigned long wait_jiffies;
> -		DEFINE_WAIT(wait);
> +
> +		wb_do_writeback(wb);
> 
>  		prepare_to_wait(&wb->wait, &wait, TASK_INTERRUPTIBLE);
>  		wait_jiffies = msecs_to_jiffies(dirty_writeback_interval * 10);
>  		schedule_timeout(wait_jiffies);
>  		try_to_freeze();
> -
> -		/*
> -		 * We get here in two cases:
> -		 *
> -		 *  schedule_timeout() returned because the dirty writeback
> -		 *  interval has elapsed. If that happens, we will be able
> -		 *  to acquire the writeback lock and will proceed to do
> -		 *  kupdated style writeout.
> -		 *
> -		 *  Someone called bdi_start_writeback(), which will acquire
> -		 *  the writeback lock. This means our writeback_acquire()
> -		 *  below will fail and we call into bdi_pdflush() for
> -		 *  pdflush style writeout.
> -		 *
> -		 */
> -		if (writeback_acquire(wb))
> -			wb_kupdated(wb);
> -		else
> -			wb_writeback(wb);
> -
> -		writeback_release(wb);
> -		finish_wait(&wb->wait, &wait);
>  	}
> 
> +	finish_wait(&wb->wait, &wait);
>  	return 0;
>  }
> 
> +/*
> + * Schedule writeback for all backing devices. Expensive! If this is a data
> + * integrity operation, writeback will be complete when this returns. If
> + * we are simply called for WB_SYNC_NONE, then writeback will merely be
> + * scheduled to run.
> + */
>  void bdi_writeback_all(struct super_block *sb, long nr_pages,
>  		       enum writeback_sync_modes sync_mode)
>  {
> +	const bool must_wait = sync_mode == WB_SYNC_ALL;
>  	struct backing_dev_info *bdi, *tmp;
> +	struct bdi_work *work;
> +	LIST_HEAD(list);
> 
>  	mutex_lock(&bdi_lock);
> 
>  	list_for_each_entry_safe(bdi, tmp, &bdi_list, bdi_list) {
> +		struct bdi_work *work, work_stack;
> +
>  		if (!bdi_has_dirty_io(bdi))
>  			continue;
> -		bdi_start_writeback(bdi, sb, nr_pages, sync_mode);
> +
> +		work = bdi_alloc_work(sb, nr_pages, sync_mode);
> +		if (!work) {
> +			work = &work_stack;
> +			bdi_work_init_on_stack(work, sb, nr_pages, sync_mode);
> +		} else if (must_wait)
> +			list_add_tail(&work->wait_list, &list);
> +
> +		bdi_queue_work(bdi, work);
> +		__bdi_start_work(bdi, work);
> +
> +		/*
> +		 * Do the wait inline if this came from the stack. This
> +		 * only happens if we ran out of memory, so should very
> +		 * rarely trigger.
> +		 */
> +		if (work == &work_stack) {
> +			bdi_wait_on_work_clear(work);
> +			if (must_wait)
> +				call_rcu(&work->rcu_head, bdi_work_free);
> +		}
>  	}
> 
>  	mutex_unlock(&bdi_lock);
> +
> +	/*
> +	 * If this is for WB_SYNC_ALL, wait for pending work to complete
> +	 * before returning.
> +	 */
> +	while (!list_empty(&list)) {
> +		work = list_entry(list.next, struct bdi_work, wait_list);
> +		list_del(&work->wait_list);
> +		bdi_wait_on_work_clear(work);
> +		call_rcu(&work->rcu_head, bdi_work_free);
> +	}
>  }
> 
>  /*
> - * We have only a single wb per bdi, so just return that.
> + * If the filesystem didn't provide a way to map an inode to a dedicated
> + * flusher thread, it doesn't support more than 1 thread. So we know it's
> + * the default thread, return that.
>   */
>  static inline struct bdi_writeback *inode_get_wb(struct inode *inode)
>  {
> -	return &inode_to_bdi(inode)->wb;
> +	const struct super_operations *sop = inode->i_sb->s_op;
> +
> +	if (!sop->inode_get_wb)
> +		return &inode_to_bdi(inode)->wb;
> +
> +	return sop->inode_get_wb(inode);
>  }
> 
>  /**
> @@ -729,8 +997,24 @@ void generic_sync_bdi_inodes(struct super_block *sb,
>  			     struct writeback_control *wbc)
>  {
>  	struct backing_dev_info *bdi = wbc->bdi;
> +	struct bdi_writeback *wb;
> 
> -	generic_sync_wb_inodes(&bdi->wb, sb, wbc);
> +	/*
> +	 * Common case is just a single wb thread and that is embedded in
> +	 * the bdi, so it doesn't need locking
> +	 */
> +	if (!bdi_wblist_needs_lock(bdi))
> +		generic_sync_wb_inodes(&bdi->wb, sb, wbc);
> +	else {
> +		int idx;
> +
> +		idx = srcu_read_lock(&bdi->srcu);
> +
> +		list_for_each_entry_rcu(wb, &bdi->wb_list, list)
> +			generic_sync_wb_inodes(wb, sb, wbc);
> +
> +		srcu_read_unlock(&bdi->srcu, idx);
> +	}
>  }
> 
>  /*
> @@ -757,7 +1041,7 @@ void generic_sync_sb_inodes(struct super_block *sb,
>  				struct writeback_control *wbc)
>  {
>  	if (wbc->bdi)
> -		generic_sync_bdi_inodes(sb, wbc);
> +		bdi_start_writeback(wbc->bdi, sb, wbc->nr_to_write, wbc->sync_mode);
>  	else
>  		bdi_writeback_all(sb, wbc->nr_to_write, wbc->sync_mode);
> 
> diff --git a/include/linux/backing-dev.h b/include/linux/backing-dev.h
> index 77dc62c..0559cf8 100644
> --- a/include/linux/backing-dev.h
> +++ b/include/linux/backing-dev.h
> @@ -13,6 +13,8 @@
>  #include <linux/proportions.h>
>  #include <linux/kernel.h>
>  #include <linux/fs.h>
> +#include <linux/sched.h>
> +#include <linux/srcu.h>
>  #include <linux/writeback.h>
>  #include <asm/atomic.h>
> 
> @@ -26,6 +28,7 @@ struct dentry;
>  enum bdi_state {
>  	BDI_pending,		/* On its way to being activated */
>  	BDI_wb_alloc,		/* Default embedded wb allocated */
> +	BDI_wblist_lock,	/* bdi->wb_list now needs locking */
>  	BDI_async_congested,	/* The async (write) queue is getting full */
>  	BDI_sync_congested,	/* The sync queue is getting full */
>  	BDI_unused,		/* Available bits start here */
> @@ -42,6 +45,8 @@ enum bdi_stat_item {
>  #define BDI_STAT_BATCH (8*(1+ilog2(nr_cpu_ids)))
> 
>  struct bdi_writeback {
> +	struct list_head list;			/* hangs off the bdi */
> +
>  	struct backing_dev_info *bdi;		/* our parent bdi */
>  	unsigned int nr;
> 
> @@ -50,13 +55,12 @@ struct bdi_writeback {
>  	struct list_head	b_dirty;	/* dirty inodes */
>  	struct list_head	b_io;		/* parked for writeback */
>  	struct list_head	b_more_io;	/* parked for more writeback */
> -
> -	unsigned long		nr_pages;
> -	struct super_block	*sb;
> -	enum writeback_sync_modes sync_mode;
>  };
> 
> +#define BDI_MAX_FLUSHERS	32
> +
>  struct backing_dev_info {
> +	struct srcu_struct srcu; /* for wb_list read side protection */
>  	struct list_head bdi_list;
>  	unsigned long ra_pages;	/* max readahead in PAGE_CACHE_SIZE units */
>  	unsigned long state;	/* Always use atomic bitops on this */
> @@ -75,8 +79,12 @@ struct backing_dev_info {
>  	unsigned int max_ratio, max_prop_frac;
> 
>  	struct bdi_writeback wb;  /* default writeback info for this bdi */
> -	unsigned long wb_active;  /* bitmap of active tasks */
> -	unsigned long wb_mask;	  /* number of registered tasks */
> +	spinlock_t wb_lock;	  /* protects update side of wb_list */
> +	struct list_head wb_list; /* the flusher threads hanging off this bdi */
> +	unsigned long wb_mask;	  /* bitmask of registered tasks */
> +	unsigned int wb_cnt;	  /* number of registered tasks */
> +
> +	struct list_head work_list;
> 
>  	struct device *dev;
> 
> @@ -93,17 +101,23 @@ int bdi_register(struct backing_dev_info *bdi, struct device *parent,
>  		const char *fmt, ...);
>  int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev);
>  void bdi_unregister(struct backing_dev_info *bdi);
> -int bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
> +void bdi_start_writeback(struct backing_dev_info *bdi, struct super_block *sb,
>  			 long nr_pages, enum writeback_sync_modes sync_mode);
>  int bdi_writeback_task(struct bdi_writeback *wb);
>  void bdi_writeback_all(struct super_block *sb, long nr_pages,
>  			enum writeback_sync_modes sync_mode);
>  void bdi_add_default_flusher_task(struct backing_dev_info *bdi);
> +void bdi_add_flusher_task(struct backing_dev_info *bdi);
>  int bdi_has_dirty_io(struct backing_dev_info *bdi);
> 
>  extern struct mutex bdi_lock;
>  extern struct list_head bdi_list;
> 
> +static inline int bdi_wblist_needs_lock(struct backing_dev_info *bdi)
> +{
> +	return test_bit(BDI_wblist_lock, &bdi->state);
> +}
> +
>  static inline int wb_has_dirty_io(struct bdi_writeback *wb)
>  {
>  	return !list_empty(&wb->b_dirty) ||
> @@ -316,4 +330,10 @@ static inline bool mapping_cap_swap_backed(struct address_space *mapping)
>  	return bdi_cap_swap_backed(mapping->backing_dev_info);
>  }
> 
> +static inline int bdi_sched_wait(void *word)
> +{
> +	schedule();
> +	return 0;
> +}
> +
>  #endif		/* _LINUX_BACKING_DEV_H */
> diff --git a/include/linux/fs.h b/include/linux/fs.h
> index ecdc544..d3bda5d 100644
> --- a/include/linux/fs.h
> +++ b/include/linux/fs.h
> @@ -1550,11 +1550,14 @@ extern ssize_t vfs_readv(struct file *, const struct iovec __user *,
>  extern ssize_t vfs_writev(struct file *, const struct iovec __user *,
>  		unsigned long, loff_t *);
> 
> +struct bdi_writeback;
> +
>  struct super_operations {
>     	struct inode *(*alloc_inode)(struct super_block *sb);
>  	void (*destroy_inode)(struct inode *);
> 
>     	void (*dirty_inode) (struct inode *);
> +	struct bdi_writeback *(*inode_get_wb) (struct inode *);
>  	int (*write_inode) (struct inode *, int);
>  	void (*drop_inode) (struct inode *);
>  	void (*delete_inode) (struct inode *);
> diff --git a/mm/backing-dev.c b/mm/backing-dev.c
> index c8201f0..57e44e3 100644
> --- a/mm/backing-dev.c
> +++ b/mm/backing-dev.c
> @@ -199,53 +199,96 @@ static int __init default_bdi_init(void)
>  }
>  subsys_initcall(default_bdi_init);
> 
> -static void bdi_wb_init(struct bdi_writeback *wb, struct backing_dev_info *bdi)
> +static int wb_assign_nr(struct backing_dev_info *bdi, struct bdi_writeback *wb)
>  {
> -	memset(wb, 0, sizeof(*wb));
> +	unsigned long mask = BDI_MAX_FLUSHERS - 1;
> +	unsigned int nr;
> 
> -	wb->bdi = bdi;
> -	init_waitqueue_head(&wb->wait);
> -	INIT_LIST_HEAD(&wb->b_dirty);
> -	INIT_LIST_HEAD(&wb->b_io);
> -	INIT_LIST_HEAD(&wb->b_more_io);
> -}
> +	do {
> +		if ((bdi->wb_mask & mask) == mask)
> +			return 1;
> +
> +		nr = find_first_zero_bit(&bdi->wb_mask, BDI_MAX_FLUSHERS);
> +	} while (test_and_set_bit(nr, &bdi->wb_mask));
> +
> +	wb->nr = nr;
> +
> +	spin_lock(&bdi->wb_lock);
> +	bdi->wb_cnt++;
> +	spin_unlock(&bdi->wb_lock);
> 
> -static int wb_assign_nr(struct backing_dev_info *bdi, struct bdi_writeback *wb)
> -{
> -	set_bit(0, &bdi->wb_mask);
> -	wb->nr = 0;
>  	return 0;
>  }
> 
>  static void bdi_put_wb(struct backing_dev_info *bdi, struct bdi_writeback *wb)
>  {
>  	clear_bit(wb->nr, &bdi->wb_mask);
> -	clear_bit(BDI_wb_alloc, &bdi->state);
> +
> +	if (wb == &bdi->wb)
> +		clear_bit(BDI_wb_alloc, &bdi->state);
> +	else
> +		kfree(wb);
> +
> +	spin_lock(&bdi->wb_lock);
> +	bdi->wb_cnt--;
> +	spin_unlock(&bdi->wb_lock);
> +}
> +
> +static int bdi_wb_init(struct bdi_writeback *wb, struct backing_dev_info *bdi)
> +{
> +	memset(wb, 0, sizeof(*wb));
> +
> +	wb->bdi = bdi;
> +	init_waitqueue_head(&wb->wait);
> +	INIT_LIST_HEAD(&wb->b_dirty);
> +	INIT_LIST_HEAD(&wb->b_io);
> +	INIT_LIST_HEAD(&wb->b_more_io);
> +
> +	return wb_assign_nr(bdi, wb);
>  }
> 
>  static struct bdi_writeback *bdi_new_wb(struct backing_dev_info *bdi)
>  {
>  	struct bdi_writeback *wb;
> 
> -	set_bit(BDI_wb_alloc, &bdi->state);
> -	wb = &bdi->wb;
> -	wb_assign_nr(bdi, wb);
> +	/*
> +	 * Default bdi->wb is already assigned, so just return it
> +	 */
> +	if (!test_and_set_bit(BDI_wb_alloc, &bdi->state))
> +		wb = &bdi->wb;
> +	else {
> +		wb = kmalloc(sizeof(struct bdi_writeback), GFP_KERNEL);
> +		if (wb) {
> +			if (bdi_wb_init(wb, bdi)) {
> +				kfree(wb);
> +				wb = NULL;
> +			}
> +		}
> +	}
> +
>  	return wb;
>  }
> 
> -static int bdi_start_fn(void *ptr)
> +static void bdi_task_init(struct backing_dev_info *bdi,
> +			  struct bdi_writeback *wb)
>  {
> -	struct bdi_writeback *wb = ptr;
> -	struct backing_dev_info *bdi = wb->bdi;
>  	struct task_struct *tsk = current;
> -	int ret;
> +	int was_empty;
> 
>  	/*
> -	 * Add us to the active bdi_list
> +	 * Add us to the active bdi_list. If we are adding threads beyond
> +	 * the default embedded bdi_writeback, then we need to start using
> +	 * proper locking. Check the list for empty first, then set the
> +	 * BDI_wblist_lock flag if there's > 1 entry on the list now
>  	 */
> -	mutex_lock(&bdi_lock);
> -	list_add(&bdi->bdi_list, &bdi_list);
> -	mutex_unlock(&bdi_lock);
> +	spin_lock(&bdi->wb_lock);
> +
> +	was_empty = list_empty(&bdi->wb_list);
> +	list_add_tail_rcu(&wb->list, &bdi->wb_list);
> +	if (!was_empty)
> +		set_bit(BDI_wblist_lock, &bdi->state);
> +
> +	spin_unlock(&bdi->wb_lock);
> 
>  	tsk->flags |= PF_FLUSHER | PF_SWAPWRITE;
>  	set_freezable();
> @@ -254,6 +297,22 @@ static int bdi_start_fn(void *ptr)
>  	 * Our parent may run at a different priority, just set us to normal
>  	 */
>  	set_user_nice(tsk, 0);
> +}
> +
> +static int bdi_start_fn(void *ptr)
> +{
> +	struct bdi_writeback *wb = ptr;
> +	struct backing_dev_info *bdi = wb->bdi;
> +	int ret;
> +
> +	/*
> +	 * Add us to the active bdi_list
> +	 */
> +	mutex_lock(&bdi_lock);
> +	list_add(&bdi->bdi_list, &bdi_list);
> +	mutex_unlock(&bdi_lock);
> +
> +	bdi_task_init(bdi, wb);
> 
>  	/*
>  	 * Clear pending bit and wakeup anybody waiting to tear us down
> @@ -264,13 +323,44 @@ static int bdi_start_fn(void *ptr)
> 
>  	ret = bdi_writeback_task(wb);
> 
> +	/*
> +	 * Remove us from the list
> +	 */
> +	spin_lock(&bdi->wb_lock);
> +	list_del_rcu(&wb->list);
> +	spin_unlock(&bdi->wb_lock);
> +
> +	/*
> +	 * wait for rcu grace period to end, so we can free wb
> +	 */
> +	synchronize_srcu(&bdi->srcu);
> +
>  	bdi_put_wb(bdi, wb);
>  	return ret;
>  }
> 
>  int bdi_has_dirty_io(struct backing_dev_info *bdi)
>  {
> -	return wb_has_dirty_io(&bdi->wb);
> +	struct bdi_writeback *wb;
> +	int ret = 0;
> +
> +	if (!bdi_wblist_needs_lock(bdi))
> +		ret = wb_has_dirty_io(&bdi->wb);
> +	else {
> +		int idx;
> +
> +		idx = srcu_read_lock(&bdi->srcu);
> +
> +		list_for_each_entry_rcu(wb, &bdi->wb_list, list) {
> +			ret = wb_has_dirty_io(wb);
> +			if (ret)
> +				break;
> +		}
> +
> +		srcu_read_unlock(&bdi->srcu, idx);
> +	}
> +
> +	return ret;
>  }
> 
>  static void bdi_flush_io(struct backing_dev_info *bdi)
> @@ -291,6 +381,8 @@ static int bdi_forker_task(void *ptr)
>  	struct bdi_writeback *me = ptr;
>  	DEFINE_WAIT(wait);
> 
> +	bdi_task_init(me->bdi, me);
> +
>  	for (;;) {
>  		struct backing_dev_info *bdi, *tmp;
>  		struct bdi_writeback *wb;
> @@ -371,27 +463,70 @@ readd_flush:
>  }
> 
>  /*
> - * Add a new flusher task that gets created for any bdi
> - * that has dirty data pending writeout
> + * bdi_lock held on entry
>   */
> -void bdi_add_default_flusher_task(struct backing_dev_info *bdi)
> +static void bdi_add_one_flusher_task(struct backing_dev_info *bdi,
> +				     int(*func)(struct backing_dev_info *))
>  {
>  	if (!bdi_cap_writeback_dirty(bdi))
>  		return;
> 
>  	/*
> -	 * Someone already marked this pending for task creation
> +	 * Check with the helper whether to proceed adding a task. Will only
> +	 * abort if we two or more simultanous calls to
> +	 * bdi_add_default_flusher_task() occured, further additions will block
> +	 * waiting for previous additions to finish.
>  	 */
> -	if (test_and_set_bit(BDI_pending, &bdi->state))
> -		return;
> +	if (!func(bdi)) {
> +		list_move_tail(&bdi->bdi_list, &bdi_pending_list);
> 
> -	mutex_lock(&bdi_lock);
> -	list_move_tail(&bdi->bdi_list, &bdi_pending_list);
> +		/*
> +		 * We are now on the pending list, wake up bdi_forker_task()
> +		 * to finish the job and add us back to the active bdi_list
> +		 */
> +		wake_up(&default_backing_dev_info.wb.wait);
> +	}
> +}
> +
> +static int flusher_add_helper_block(struct backing_dev_info *bdi)
> +{
>  	mutex_unlock(&bdi_lock);
> +	wait_on_bit_lock(&bdi->state, BDI_pending, bdi_sched_wait,
> +				TASK_UNINTERRUPTIBLE);
> +	mutex_lock(&bdi_lock);
> +	return 0;
> +}
> 
> -	wake_up(&default_backing_dev_info.wb.wait);
> +static int flusher_add_helper_test(struct backing_dev_info *bdi)
> +{
> +	return test_and_set_bit(BDI_pending, &bdi->state);
> +}
> +
> +/*
> + * Add the default flusher task that gets created for any bdi
> + * that has dirty data pending writeout
> + */
> +void bdi_add_default_flusher_task(struct backing_dev_info *bdi)
> +{
> +	bdi_add_one_flusher_task(bdi, flusher_add_helper_test);
>  }
> 
> +/**
> + * bdi_add_flusher_task - add one more flusher task to this @bdi
> + *  @bdi:	the bdi
> + *
> + * Add an additional flusher task to this @bdi. Will block waiting on
> + * previous additions, if any.
> + *
> + */
> +void bdi_add_flusher_task(struct backing_dev_info *bdi)
> +{
> +	mutex_lock(&bdi_lock);
> +	bdi_add_one_flusher_task(bdi, flusher_add_helper_block);
> +	mutex_unlock(&bdi_lock);
> +}
> +EXPORT_SYMBOL(bdi_add_flusher_task);
> +
>  int bdi_register(struct backing_dev_info *bdi, struct device *parent,
>  		const char *fmt, ...)
>  {
> @@ -455,24 +590,21 @@ int bdi_register_dev(struct backing_dev_info *bdi, dev_t dev)
>  }
>  EXPORT_SYMBOL(bdi_register_dev);
> 
> -static int sched_wait(void *word)
> -{
> -	schedule();
> -	return 0;
> -}
> -
>  /*
>   * Remove bdi from global list and shutdown any threads we have running
>   */
>  static void bdi_wb_shutdown(struct backing_dev_info *bdi)
>  {
> +	struct bdi_writeback *wb;
> +
>  	if (!bdi_cap_writeback_dirty(bdi))
>  		return;
> 
>  	/*
>  	 * If setup is pending, wait for that to complete first
>  	 */
> -	wait_on_bit(&bdi->state, BDI_pending, sched_wait, TASK_UNINTERRUPTIBLE);
> +	wait_on_bit(&bdi->state, BDI_pending, bdi_sched_wait,
> +			TASK_UNINTERRUPTIBLE);
> 
>  	/*
>  	 * Make sure nobody finds us on the bdi_list anymore
> @@ -482,9 +614,11 @@ static void bdi_wb_shutdown(struct backing_dev_info *bdi)
>  	mutex_unlock(&bdi_lock);
> 
>  	/*
> -	 * Finally, kill the kernel thread
> +	 * Finally, kill the kernel threads. We don't need to be RCU
> +	 * safe anymore, since the bdi is gone from visibility.
>  	 */
> -	kthread_stop(bdi->wb.task);
> +	list_for_each_entry(wb, &bdi->wb_list, list)
> +		kthread_stop(wb->task);
>  }
> 
>  void bdi_unregister(struct backing_dev_info *bdi)
> @@ -508,8 +642,12 @@ int bdi_init(struct backing_dev_info *bdi)
>  	bdi->min_ratio = 0;
>  	bdi->max_ratio = 100;
>  	bdi->max_prop_frac = PROP_FRAC_BASE;
> +	spin_lock_init(&bdi->wb_lock);
> +	bdi->wb_mask = 0;
> +	bdi->wb_cnt = 0;
>  	INIT_LIST_HEAD(&bdi->bdi_list);
> -	bdi->wb_mask = bdi->wb_active = 0;
> +	INIT_LIST_HEAD(&bdi->wb_list);
> +	INIT_LIST_HEAD(&bdi->work_list);
> 
>  	bdi_wb_init(&bdi->wb, bdi);
> 
> @@ -519,10 +657,15 @@ int bdi_init(struct backing_dev_info *bdi)
>  			goto err;
>  	}
> 
> +	err = init_srcu_struct(&bdi->srcu);
> +	if (err)
> +		goto err;
> +
>  	bdi->dirty_exceeded = 0;
>  	err = prop_local_init_percpu(&bdi->completions);
> 
>  	if (err) {
> +		cleanup_srcu_struct(&bdi->srcu);
>  err:
>  		while (i--)
>  			percpu_counter_destroy(&bdi->bdi_stat[i]);
> @@ -540,6 +683,8 @@ void bdi_destroy(struct backing_dev_info *bdi)
> 
>  	bdi_unregister(bdi);
> 
> +	cleanup_srcu_struct(&bdi->srcu);
> +
>  	for (i = 0; i < NR_BDI_STAT_ITEMS; i++)
>  		percpu_counter_destroy(&bdi->bdi_stat[i]);
> 
> diff --git a/mm/page-writeback.c b/mm/page-writeback.c
> index 54a4a65..7dd7de7 100644
> --- a/mm/page-writeback.c
> +++ b/mm/page-writeback.c
> @@ -665,8 +665,7 @@ void throttle_vm_writeout(gfp_t gfp_mask)
> 
>  /*
>   * Start writeback of `nr_pages' pages.  If `nr_pages' is zero, write back
> - * the whole world.  Returns 0 if a pdflush thread was dispatched.  Returns
> - * -1 if all pdflush threads were busy.
> + * the whole world.
>   */
>  void wakeup_flusher_threads(long nr_pages)
>  {
> @@ -674,7 +673,6 @@ void wakeup_flusher_threads(long nr_pages)
>  		nr_pages = global_page_state(NR_FILE_DIRTY) +
>  				global_page_state(NR_UNSTABLE_NFS);
>  	bdi_writeback_all(NULL, nr_pages, WB_SYNC_NONE);
> -	return;
>  }
> 
>  static void laptop_timer_fn(unsigned long unused);
> -- 
> 1.6.3.rc0.1.gf800
> 
> --
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to majordomo@xxxxxxxxxxxxxxx
> More majordomo info at  http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at  http://www.tux.org/lkml/
--
To unsubscribe from this list: send the line "unsubscribe linux-fsdevel" in
the body of a message to majordomo@xxxxxxxxxxxxxxx
More majordomo info at  http://vger.kernel.org/majordomo-info.html

[Index of Archives]     [Linux Ext4 Filesystem]     [Union Filesystem]     [Filesystem Testing]     [Ceph Users]     [Ecryptfs]     [AutoFS]     [Kernel Newbies]     [Share Photos]     [Security]     [Netfilter]     [Bugtraq]     [Yosemite News]     [MIPS Linux]     [ARM Linux]     [Linux Security]     [Linux Cachefs]     [Reiser Filesystem]     [Linux RAID]     [Samba]     [Device Mapper]     [CEPH Development]
  Powered by Linux