After r->consumer_head is updated in __ptr_ring_discard_one(), r->queue[r->consumer_head] is already cleared in the previous round of __ptr_ring_discard_one(). But there is no guarantee other thread will see the r->queue[r->consumer_head] being NULL because there is no explicit barrier between r->queue[] clearing and r->consumer_head updating. So add two explicit barrier to make sure r->queue[] cleared in __ptr_ring_discard_one() to be visible to other cpu, mainly to make sure the cpu calling the __ptr_ring_empty() will see the correct r->queue[r->consumer_head]. Hopefully the previous and this patch have ensured the correct visibility of r->queue[], so update the comment accordingly about __ptr_ring_empty(). Tested using the "perf stat -r 1000 ./ptr_ring_test -s 1000 -m 1 -N 100000000", comparing the elapsed time: arch unpatched patched improvement arm64 1.888224 sec 1.893673 sec -0.2% X86 2.5422 sec 2.5587 sec -0.6% Reported-by: Michael S. Tsirkin <mst@xxxxxxxxxx> Signed-off-by: Yunsheng Lin <linyunsheng@xxxxxxxxxx> --- include/linux/ptr_ring.h | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h index db9c282..d78aab8 100644 --- a/include/linux/ptr_ring.h +++ b/include/linux/ptr_ring.h @@ -178,15 +178,11 @@ static inline void *__ptr_ring_peek(struct ptr_ring *r) * * NB: This is only safe to call if ring is never resized. * - * However, if some other CPU consumes ring entries at the same time, the value - * returned is not guaranteed to be correct. - * - * In this case - to avoid incorrectly detecting the ring - * as empty - the CPU consuming the ring entries is responsible - * for either consuming all ring entries until the ring is empty, - * or synchronizing with some other CPU and causing it to - * re-test __ptr_ring_empty and/or consume the ring enteries - * after the synchronization point. + * caller might need to use the smp_rmb() to pair with smp_wmb() + * or smp_store_release() in __ptr_ring_discard_one() and smp_wmb() + * in __ptr_ring_produce() to ensure correct ordering between + * __ptr_ring_empty() checking and subsequent operation after + * __ptr_ring_empty() checking. * * Note: callers invoking this in a loop must use a compiler barrier, * for example cpu_relax(). @@ -274,7 +270,12 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) if (unlikely(consumer_head >= r->size)) { r->consumer_tail = 0; - WRITE_ONCE(r->consumer_head, 0); + + /* Make sure r->queue[0] ~ r->queue[r->consumer_tail] + * cleared in previous __ptr_ring_discard_one() is + * visible to other cpu. + */ + smp_store_release(&r->consumer_head, 0); } else { r->consumer_tail = consumer_head; WRITE_ONCE(r->consumer_head, consumer_head); @@ -288,6 +289,14 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r) while (likely(--consumer_head >= tail)) r->queue[consumer_head] = NULL; + if (unlikely(!r->consumer_head)) { + /* Make sure r->queue[r->consumer_tail] ~ + * r->queue[r->size - 1] cleared above is visible to + * other cpu. + */ + smp_wmb(); + } + return; } -- 2.7.4