blk->id read race: was: [PATCH v2 2/3] printk: add lockless buffer

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

 



On Fri 2020-05-01 11:46:09, John Ogness wrote:
> Introduce a multi-reader multi-writer lockless ringbuffer for storing
> the kernel log messages. Readers and writers may use their API from
> any context (including scheduler and NMI). This ringbuffer will make
> it possible to decouple printk() callers from any context, locking,
> or console constraints. It also makes it possible for readers to have
> full access to the ringbuffer contents at any time and context (for
> example from any panic situation).
> 
> --- /dev/null
> +++ b/kernel/printk/printk_ringbuffer.c
> +/*
> + * Given a data ring (text or dict), put the associated descriptor of each
> + * data block from @lpos_begin until @lpos_end into the reusable state.
> + *
> + * If there is any problem making the associated descriptor reusable, either
> + * the descriptor has not yet been committed or another writer task has
> + * already pushed the tail lpos past the problematic data block. Regardless,
> + * on error the caller can re-load the tail lpos to determine the situation.
> + */
> +static bool data_make_reusable(struct printk_ringbuffer *rb,
> +			       struct prb_data_ring *data_ring,
> +			       unsigned long lpos_begin,
> +			       unsigned long lpos_end,
> +			       unsigned long *lpos_out)
> +{
> +	struct prb_desc_ring *desc_ring = &rb->desc_ring;
> +	struct prb_data_blk_lpos *blk_lpos;
> +	struct prb_data_block *blk;
> +	unsigned long tail_lpos;
> +	enum desc_state d_state;
> +	struct prb_desc desc;
> +	unsigned long id;
> +
> +	/*
> +	 * Using the provided @data_ring, point @blk_lpos to the correct
> +	 * blk_lpos within the local copy of the descriptor.
> +	 */
> +	if (data_ring == &rb->text_data_ring)
> +		blk_lpos = &desc.text_blk_lpos;
> +	else
> +		blk_lpos = &desc.dict_blk_lpos;
> +
> +	/* Loop until @lpos_begin has advanced to or beyond @lpos_end. */
> +	while ((lpos_end - lpos_begin) - 1 < DATA_SIZE(data_ring)) {
> +		blk = to_block(data_ring, lpos_begin);
> +		id = READ_ONCE(blk->id); /* LMM(data_make_reusable:A) */

This would deserve some comment:

1. Compiler could not optimize out the read because there is a data
   dependency on lpos_begin.

2. Compiler could not postpone the read because it is followed by
   smp_rmb().

So, is READ_ONCE() realy needed?

Well, blk->id clearly can be modified in parallel so we need to be
careful. There is smp_rmb() right below. Do we needed smp_rmb() also
before?

What about the following scenario?:


CPU0						CPU1

						data_alloc()
						  data_push_tail()

						blk = to_block(data_ring, begin_lpos)
						WRITE_ONCE(blk->id, id); /* LMM(data_alloc:B) */

desc_push_tail()
  data_push_tail()

    tail_lpos = data_ring->tail_lpos;
    // see data_ring->tail_lpos already updated by CPU1

    data_make_reusable()

      // lpos_begin = tail_lpos via parameter
      blk = to_block(data_ring, lpos_begin);
      id = blk->id

Now: CPU0 might see outdated blk->id before CPU1 wrote new value
     because there is no read barrier betwen reading tail_lpos
     and blk->id here.

     The outdated id would cause desc_miss. CPU0 would return back
     to data_push_tail(). It will try to re-read data_ring->tail_lpos.
     But it will be the same as before because it already read the
     updated value.

     As a result, data_alloc() would fail.

IMHO, we need smp_rmb() between data_ring->tail_lpos read and
the related blk->id read. It should be either in data_push_tail()
or in data_make_reusable().

Best Regards,
Petr

PS: I am still in the middle of the review. I think that it is better
to discuss each race separately.

> +		/*
> +		 * Guarantee the block ID is loaded before checking the tail
> +		 * lpos. The loaded block ID can only be considered valid if
> +		 * the tail lpos has not overtaken @lpos_begin. This pairs
> +		 * with data_alloc:A.
> +		 *
> +		 * Memory barrier involvement:
> +		 *
> +		 * If data_make_reusable:A reads from data_alloc:B, then
> +		 * data_make_reusable:C reads from data_push_tail:D.
> +		 *
> +		 * Relies on:
> +		 *
> +		 * MB from data_push_tail:D to data_alloc:B
> +		 *    matching
> +		 * RMB from data_make_reusable:A to data_make_reusable:C
> +		 *
> +		 * Note: data_push_tail:D and data_alloc:B can be different
> +		 *       CPUs. However, the data_alloc:B CPU (which performs
> +		 *       the full memory barrier) must have previously seen
> +		 *       data_push_tail:D.
> +		 */
> +		smp_rmb(); /* LMM(data_make_reusable:B) */
> +
> +		tail_lpos = atomic_long_read(&data_ring->tail_lpos
> +					); /* LMM(data_make_reusable:C) */
> +
> +		/*
> +		 * If @lpos_begin has fallen behind the tail lpos, the read
> +		 * block ID cannot be trusted. Fast forward @lpos_begin to the
> +		 * tail lpos and try again.
> +		 */
> +		if (lpos_begin - tail_lpos >= DATA_SIZE(data_ring)) {
> +			lpos_begin = tail_lpos;
> +			continue;
> +		}
> +
> +		d_state = desc_read(desc_ring, id,
> +				    &desc); /* LMM(data_make_reusable:D) */
> +
> +		switch (d_state) {
> +		case desc_miss:
> +			return false;
> +		case desc_reserved:
> +			return false;
> +		case desc_committed:
> +			/*
> +			 * This data block is invalid if the descriptor
> +			 * does not point back to it.
> +			 */
> +			if (blk_lpos->begin != lpos_begin)
> +				return false;
> +			desc_make_reusable(desc_ring, id);
> +			break;
> +		case desc_reusable:
> +			/*
> +			 * This data block is invalid if the descriptor
> +			 * does not point back to it.
> +			 */
> +			if (blk_lpos->begin != lpos_begin)
> +				return false;
> +			break;
> +		}
> +
> +		/* Advance @lpos_begin to the next data block. */
> +		lpos_begin = blk_lpos->next;
> +	}
> +
> +	*lpos_out = lpos_begin;
> +	return true;
> +}
> +
> +/*
> + * Advance the data ring tail to at least @lpos. This function puts
> + * descriptors into the reusable state if the tail is pushed beyond
> + * their associated data block.
> + */
> +static bool data_push_tail(struct printk_ringbuffer *rb,
> +			   struct prb_data_ring *data_ring,
> +			   unsigned long lpos)
> +{
> +	unsigned long tail_lpos;
> +	unsigned long next_lpos;
> +
> +	/* If @lpos is not valid, there is nothing to do. */
> +	if (lpos == INVALID_LPOS)
> +		return true;
> +
> +	tail_lpos = atomic_long_read(&data_ring->tail_lpos);
> +
> +	do {
> +		/* Done, if the tail lpos is already at or beyond @lpos. */
> +		if ((lpos - tail_lpos) - 1 >= DATA_SIZE(data_ring))
> +			break;
> +
> +		/*
> +		 * Make all descriptors reusable that are associated with
> +		 * data blocks before @lpos.
> +		 */
> +		if (!data_make_reusable(rb, data_ring, tail_lpos, lpos,
> +					&next_lpos)) {
> +			/*
> +			 * Guarantee the descriptor state loaded in
> +			 * data_make_reusable() is performed before reloading
> +			 * the tail lpos. The failed data_make_reusable() may
> +			 * be due to a newly recycled descriptor causing
> +			 * the tail lpos to have been previously pushed. This
> +			 * pairs with desc_reserve:D.
> +			 *
> +			 * Memory barrier involvement:
> +			 *
> +			 * If data_make_reusable:D reads from desc_reserve:G,
> +			 * then data_push_tail:B reads from data_push_tail:D.
> +			 *
> +			 * Relies on:
> +			 *
> +			 * MB from data_push_tail:D to desc_reserve:G
> +			 *    matching
> +			 * RMB from data_make_reusable:D to data_push_tail:B
> +			 *
> +			 * Note: data_push_tail:D and desc_reserve:G can be
> +			 *       different CPUs. However, the desc_reserve:G
> +			 *       CPU (which performs the full memory barrier)
> +			 *       must have previously seen data_push_tail:D.
> +			 */
> +			smp_rmb(); /* LMM(data_push_tail:A) */
> +
> +			next_lpos = atomic_long_read(&data_ring->tail_lpos
> +						); /* LMM(data_push_tail:B) */
> +			if (next_lpos == tail_lpos)
> +				return false;
> +
> +			/* Another task pushed the tail. Try again. */
> +			tail_lpos = next_lpos;
> +			continue;
> +		}
> +
> +		/*
> +		 * Guarantee any descriptor states that have transitioned to
> +		 * reusable are stored before pushing the tail lpos. This
> +		 * allows readers to identify if data has expired while
> +		 * reading the descriptor. This pairs with desc_read:D.
> +		 */
> +		smp_mb(); /* LMM(data_push_tail:C) */
> +
> +	} while (!atomic_long_try_cmpxchg_relaxed(&data_ring->tail_lpos,
> +			&tail_lpos, next_lpos)); /* LMM(data_push_tail:D) */
> +
> +	return true;
> +}
> +

_______________________________________________
kexec mailing list
kexec@xxxxxxxxxxxxxxxxxxx
http://lists.infradead.org/mailman/listinfo/kexec



[Index of Archives]     [LM Sensors]     [Linux Sound]     [ALSA Users]     [ALSA Devel]     [Linux Audio Users]     [Linux Media]     [Kernel]     [Gimp]     [Yosemite News]     [Linux Media]

  Powered by Linux